From a7cb8960c013bfbefe260255c168b192a2dbabbb Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Tue, 5 Nov 2024 17:50:13 -0300 Subject: [PATCH 01/12] feat(Threads): Refresh Threads access tokens automatically. --- bc/channel/signals.py | 21 +++++++++++++ bc/channel/tasks.py | 71 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/bc/channel/signals.py b/bc/channel/signals.py index 8080b1d4..26a1c0f0 100644 --- a/bc/channel/signals.py +++ b/bc/channel/signals.py @@ -1,3 +1,6 @@ +import logging +from datetime import timedelta + from django.conf import settings from django.db.models.signals import post_save from django.dispatch import receiver @@ -8,9 +11,12 @@ from bc.core.utils.cloudfront import create_cache_invalidation from .models import Channel, Group +from .tasks import refresh_threads_access_token queue = get_queue("default") +logger = logging.getLogger(__name__) + @receiver(post_save, sender=Group) def group_handler(sender, instance=None, created=False, **kwargs): @@ -49,3 +55,18 @@ def channel_handler(sender, instance=None, created=False, **kwargs): interval=settings.RQ_RETRY_INTERVAL, ), ) + + # Schedule initial token refresh 2 days after creation + if created and instance.service == Channel.THREADS: + queue.enqueue_in( + timedelta(days=2), + refresh_threads_access_token, + channel_pk=instance.pk, + retry=Retry( + max=settings.RQ_MAX_NUMBER_OF_RETRIES, + interval=settings.RQ_RETRY_INTERVAL, + ), + ) + logger.info( + f"Scheduled new refresh token for newly created channel {instance} in 2 days" + ) diff --git a/bc/channel/tasks.py b/bc/channel/tasks.py index e3bd9add..53d7f7df 100644 --- a/bc/channel/tasks.py +++ b/bc/channel/tasks.py @@ -1,3 +1,7 @@ +import logging +from datetime import timedelta + +import requests from django.conf import settings from django_rq.queues import get_queue from rq import Retry @@ -6,6 +10,8 @@ queue = get_queue("default") +logger = logging.getLogger(__name__) + def enqueue_text_status_for_channel(channel: Channel, text: str) -> None: """ @@ -38,3 +44,68 @@ def enqueue_text_status_for_group(group: Group, text: str) -> None: """ for channel in group.channels.all(): enqueue_text_status_for_channel(channel, text) + + +def refresh_threads_access_token(channel_pk): + """ + Task to refresh the access token for a Threads channel. + """ + try: + channel = Channel.objects.get(pk=channel_pk) + except Channel.DoesNotExist: + logger.warning( + f"Trying to refresh Threads access token for channel {channel_pk} but it no longer exists." + ) + return + + if channel.service != Channel.THREADS: + logger.warning( + f"Trying to refresh Threads access token for a {channel.get_service_display()} channel. Aborting." + ) + return + + refresh_access_token_url = "https://graph.threads.net/refresh_access_token" + params = { + "grant_type": "th_refresh_token", + "access_token": channel.access_token, + } + response = requests.get(refresh_access_token_url, params=params) + + if response.status_code != 200: + logger.error( + f"Failed to refresh access token for Threads channel {channel}:" + f" {response.status_code} {response.text}" + ) + return + + data = response.json() + new_access_token = data.get("access_token") + expires_in = data.get("expires_in") # In seconds + + if new_access_token is None or expires_in is None: + logger.error( + f"Missing 'access_token' or 'expires_in' in refresh access token response for Threads channel {channel}: {data}\n" + f"If the issue persists, a new access token can be retrieved manually with the script again." + ) + return + + channel.access_token = new_access_token + channel.save() + + # Schedule the next token refresh + delay_seconds = ( + expires_in - 86400 + ) # Subtract one day to avoid expiration before the task runs + queue.enqueue_in( + timedelta(seconds=delay_seconds if delay_seconds > 0 else expires_in), + refresh_threads_access_token, + channel_pk=channel.pk, + retry=Retry( + max=settings.RQ_MAX_NUMBER_OF_RETRIES, + interval=settings.RQ_RETRY_INTERVAL, + ), + ) + logger.info( + f"Scheduled new refresh token for Threads channel {channel}" + f" in {delay_seconds} seconds ({expires_in / 86400:.1f} days)" + ) From 69ec643a4b966843a2cce0a034a6ec88852f5777 Mon Sep 17 00:00:00 2001 From: Elisa Date: Wed, 6 Nov 2024 20:52:00 -0300 Subject: [PATCH 02/12] style(channel_tasks): Improve readability Co-authored-by: Mike Lissner --- bc/channel/tasks.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/bc/channel/tasks.py b/bc/channel/tasks.py index 53d7f7df..600f4b35 100644 --- a/bc/channel/tasks.py +++ b/bc/channel/tasks.py @@ -92,10 +92,8 @@ def refresh_threads_access_token(channel_pk): channel.access_token = new_access_token channel.save() - # Schedule the next token refresh - delay_seconds = ( - expires_in - 86400 - ) # Subtract one day to avoid expiration before the task runs + # Schedule the next token refresh one day before expiration + delay_seconds = expires_in - 86400 queue.enqueue_in( timedelta(seconds=delay_seconds if delay_seconds > 0 else expires_in), refresh_threads_access_token, From 3c74070d2eb7be8df106786c07c61b44eb6faf25 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Wed, 6 Nov 2024 21:20:29 -0300 Subject: [PATCH 03/12] fix(Threads): Add timeout to request to refresh access token --- bc/channel/tasks.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/bc/channel/tasks.py b/bc/channel/tasks.py index 600f4b35..fd15e318 100644 --- a/bc/channel/tasks.py +++ b/bc/channel/tasks.py @@ -69,7 +69,11 @@ def refresh_threads_access_token(channel_pk): "grant_type": "th_refresh_token", "access_token": channel.access_token, } - response = requests.get(refresh_access_token_url, params=params) + response = requests.get( + refresh_access_token_url, + params=params, + timeout=10, + ) if response.status_code != 200: logger.error( From fca57c79a4c2833e267341a1c393b526f117c72f Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Fri, 8 Nov 2024 03:06:34 -0300 Subject: [PATCH 04/12] refactor(Threads): remove enqueued tasks and centralize token refresh logic - Eliminated chained enqueued tasks used for token refreshing in Threads integration. - Moved token validation and refresh logic into the Channel model's validate_access_token method. - Adjusted ThreadsAPI methods to handle token expiration and refreshing internally. - Implemented Redis-based locking mechanism to prevent concurrent token refreshes. --- bc/channel/models.py | 55 +++++++++++- bc/channel/signals.py | 17 ---- bc/channel/tasks.py | 69 --------------- bc/channel/utils/connectors/base.py | 4 + bc/channel/utils/connectors/threads.py | 5 +- .../utils/connectors/threads_api/client.py | 84 ++++++++++++++++++- bc/subscription/tasks.py | 3 + scripts/get-threads-keys.py | 32 ++++++- 8 files changed, 177 insertions(+), 92 deletions(-) diff --git a/bc/channel/models.py b/bc/channel/models.py index 1a645afb..974010e9 100644 --- a/bc/channel/models.py +++ b/bc/channel/models.py @@ -1,17 +1,26 @@ +import logging + from django.db import models from django.urls import reverse +from redis.exceptions import LockError from bc.core.models import AbstractDateTimeModel from bc.core.utils.color import format_color_str from bc.sponsorship.models import Sponsorship from bc.users.models import User -from .utils.connectors.base import BaseAPIConnector +from ..core.utils.redis import make_redis_interface +from .utils.connectors.base import ( + BaseAPIConnector, + RefreshableBaseAPIConnector, +) from .utils.connectors.bluesky import BlueskyConnector from .utils.connectors.masto import MastodonConnector, get_handle_parts from .utils.connectors.threads import ThreadsConnector from .utils.connectors.twitter import TwitterConnector +logger = logging.getLogger(__name__) + class Group(AbstractDateTimeModel): name = models.CharField( @@ -70,6 +79,7 @@ class Channel(AbstractDateTimeModel): (BLUESKY, "Bluesky"), (THREADS, "Threads"), ) + CHANNELS_TO_REFRESH = [THREADS] service = models.PositiveSmallIntegerField( help_text="Type of the service", choices=CHANNELS, @@ -107,7 +117,9 @@ class Channel(AbstractDateTimeModel): blank=True, ) - def get_api_wrapper(self) -> BaseAPIConnector: + def get_api_wrapper( + self, + ) -> BaseAPIConnector | RefreshableBaseAPIConnector: match self.service: case self.TWITTER: return TwitterConnector( @@ -145,6 +157,45 @@ def self_url(self): f"Channel.self_url() not yet implemented for service {self.service}" ) + def validate_access_token(self): + if self.service not in self.CHANNELS_TO_REFRESH: + return + r = make_redis_interface("CACHE") + lock_key = self._get_refresh_lock_key() + lock = r.lock(lock_key, sleep=1, timeout=60) + blocking_timeout = 60 + try: + lock.acquire(blocking=True, blocking_timeout=blocking_timeout) + self._refresh_access_token() + except LockError as e: + logger.error( + f"LockError while acquiring lock for channel {self}: {e}" + ) + raise e + finally: + if lock.owned(): + try: + lock.release() + except Exception as e: + logger.error( + f"Error releasing lock for channel {self}:\n{e}" + ) + + def _refresh_access_token(self): + api = self.get_api_wrapper() + try: + refreshed, access_token = api.validate_access_token() + if refreshed: + self.access_token = access_token + self.save() + except Exception as e: + logger.error( + f"Error when trying to refresh token for channel {self.pk}:\n{e}" + ) + + def _get_refresh_lock_key(self): + return f"token_refresh_lock_{self.account_id}@{self.get_service_display()}" + def __str__(self) -> str: if self.account: return f"{self.pk}: {self.account}" diff --git a/bc/channel/signals.py b/bc/channel/signals.py index 26a1c0f0..2eabe3db 100644 --- a/bc/channel/signals.py +++ b/bc/channel/signals.py @@ -1,5 +1,4 @@ import logging -from datetime import timedelta from django.conf import settings from django.db.models.signals import post_save @@ -11,7 +10,6 @@ from bc.core.utils.cloudfront import create_cache_invalidation from .models import Channel, Group -from .tasks import refresh_threads_access_token queue = get_queue("default") @@ -55,18 +53,3 @@ def channel_handler(sender, instance=None, created=False, **kwargs): interval=settings.RQ_RETRY_INTERVAL, ), ) - - # Schedule initial token refresh 2 days after creation - if created and instance.service == Channel.THREADS: - queue.enqueue_in( - timedelta(days=2), - refresh_threads_access_token, - channel_pk=instance.pk, - retry=Retry( - max=settings.RQ_MAX_NUMBER_OF_RETRIES, - interval=settings.RQ_RETRY_INTERVAL, - ), - ) - logger.info( - f"Scheduled new refresh token for newly created channel {instance} in 2 days" - ) diff --git a/bc/channel/tasks.py b/bc/channel/tasks.py index fd15e318..fcbf1a22 100644 --- a/bc/channel/tasks.py +++ b/bc/channel/tasks.py @@ -1,7 +1,5 @@ import logging -from datetime import timedelta -import requests from django.conf import settings from django_rq.queues import get_queue from rq import Retry @@ -44,70 +42,3 @@ def enqueue_text_status_for_group(group: Group, text: str) -> None: """ for channel in group.channels.all(): enqueue_text_status_for_channel(channel, text) - - -def refresh_threads_access_token(channel_pk): - """ - Task to refresh the access token for a Threads channel. - """ - try: - channel = Channel.objects.get(pk=channel_pk) - except Channel.DoesNotExist: - logger.warning( - f"Trying to refresh Threads access token for channel {channel_pk} but it no longer exists." - ) - return - - if channel.service != Channel.THREADS: - logger.warning( - f"Trying to refresh Threads access token for a {channel.get_service_display()} channel. Aborting." - ) - return - - refresh_access_token_url = "https://graph.threads.net/refresh_access_token" - params = { - "grant_type": "th_refresh_token", - "access_token": channel.access_token, - } - response = requests.get( - refresh_access_token_url, - params=params, - timeout=10, - ) - - if response.status_code != 200: - logger.error( - f"Failed to refresh access token for Threads channel {channel}:" - f" {response.status_code} {response.text}" - ) - return - - data = response.json() - new_access_token = data.get("access_token") - expires_in = data.get("expires_in") # In seconds - - if new_access_token is None or expires_in is None: - logger.error( - f"Missing 'access_token' or 'expires_in' in refresh access token response for Threads channel {channel}: {data}\n" - f"If the issue persists, a new access token can be retrieved manually with the script again." - ) - return - - channel.access_token = new_access_token - channel.save() - - # Schedule the next token refresh one day before expiration - delay_seconds = expires_in - 86400 - queue.enqueue_in( - timedelta(seconds=delay_seconds if delay_seconds > 0 else expires_in), - refresh_threads_access_token, - channel_pk=channel.pk, - retry=Retry( - max=settings.RQ_MAX_NUMBER_OF_RETRIES, - interval=settings.RQ_RETRY_INTERVAL, - ), - ) - logger.info( - f"Scheduled new refresh token for Threads channel {channel}" - f" in {delay_seconds} seconds ({expires_in / 86400:.1f} days)" - ) diff --git a/bc/channel/utils/connectors/base.py b/bc/channel/utils/connectors/base.py index 3a209516..99268912 100644 --- a/bc/channel/utils/connectors/base.py +++ b/bc/channel/utils/connectors/base.py @@ -70,3 +70,7 @@ def add_status( int: The unique identifier for the new status. """ ... + + +class RefreshableBaseAPIConnector(BaseAPIConnector, Protocol): + def validate_access_token(self) -> tuple[bool, str]: ... diff --git a/bc/channel/utils/connectors/threads.py b/bc/channel/utils/connectors/threads.py index 87c0033b..ce0d8b48 100644 --- a/bc/channel/utils/connectors/threads.py +++ b/bc/channel/utils/connectors/threads.py @@ -11,7 +11,7 @@ class ThreadsConnector: """ A connector for interfacing with the Threads API, which complies with - the BaseAPIConnector protocol. + the RefreshableBaseAPIConnector protocol. """ def __init__( @@ -32,6 +32,9 @@ def get_api_object(self, _version=None) -> ThreadsAPI: ) return api + def validate_access_token(self) -> tuple[bool, str]: + return self.api.validate_access_token() + def upload_media(self, media: bytes, _alt_text=None) -> str: """ Uploads media to public storage for Threads API compatibility. diff --git a/bc/channel/utils/connectors/threads_api/client.py b/bc/channel/utils/connectors/threads_api/client.py index e7267fce..efb75c7a 100644 --- a/bc/channel/utils/connectors/threads_api/client.py +++ b/bc/channel/utils/connectors/threads_api/client.py @@ -1,11 +1,12 @@ import logging import time import uuid +from datetime import datetime, timedelta, timezone import requests -from django.conf import settings from bc.core.utils.images import convert_to_jpeg, resize_image +from bc.core.utils.redis import make_redis_interface from bc.core.utils.s3 import put_object_in_bucket logger = logging.getLogger(__name__) @@ -165,6 +166,87 @@ def attempt_post( return None return response + def validate_access_token(self) -> tuple[bool, str]: + r = make_redis_interface("CACHE") + refreshed = False + + try: + cached_expiration_date = r.get(self._get_expiration_key()) + except Exception as e: + logger.error( + f"Could not retrieve cached token, will attempt to refresh.\n" + f"Redis error: {e}" + ) + return self.refresh_access_token(), self._access_token + + if cached_expiration_date is None: + return self.refresh_access_token(), self._access_token + + expiration_date = datetime.fromisoformat(str(cached_expiration_date)) + delta = expiration_date - datetime.now(timezone.utc) + will_expire_soon = delta <= timedelta(days=2) + + if will_expire_soon: + refreshed = self.refresh_access_token() + + return refreshed, self._access_token + + def refresh_access_token(self) -> bool: + refresh_access_token_url = ( + "https://graph.threads.net/refresh_access_token" + ) + params = { + "grant_type": "th_refresh_token", + "access_token": self._access_token, + } + try: + response = requests.get( + refresh_access_token_url, + params=params, + timeout=10, + ) + response.raise_for_status() + except requests.exceptions.RequestException as err: + logger.error( + f"Failed to refresh access token for Threads account {self._account_id}:\n" + f"{err}" + ) + return False + + data = response.json() + new_access_token = data.get("access_token") + expires_in = data.get("expires_in") # In seconds + + if new_access_token is None or expires_in is None: + logger.error( + f"Missing 'access_token' or 'expires_in' in refresh access token response for Threads account {self._account_id}. " + f"If the issue persists, a new access token can be retrieved manually with the script again.\n" + f"Response data: {data}" + ) + return False + + self._access_token = new_access_token + self._set_token_expiration_in_cache(expires_in) + + return True + + def _set_token_expiration_in_cache(self, expires_in: int): + delay = timedelta(seconds=expires_in) + expiration_date = (datetime.now(timezone.utc) + delay).isoformat() + r = make_redis_interface("CACHE") + key = self._get_expiration_key() + try: + r.set( + key, + expiration_date.encode("utf-8"), + ex=expires_in, + ) + except Exception as e: + logger.error(f"Could not set {key} in cache:\n{e}") + + def _get_expiration_key(self): + return f"threads_token_expiration_{self._account_id}" + @staticmethod def resize_and_upload_to_public_storage(media: bytes) -> str: """ diff --git a/bc/subscription/tasks.py b/bc/subscription/tasks.py index 6bd42559..18a13e43 100644 --- a/bc/subscription/tasks.py +++ b/bc/subscription/tasks.py @@ -110,6 +110,7 @@ def enqueue_posts_for_new_case( initial_complaint_link=initial_complaint_link, ) + channel.validate_access_token() api = channel.get_api_wrapper() sponsor_message = None @@ -423,6 +424,8 @@ def make_post_for_webhook_event( if sponsor_text and files: files = add_sponsored_text_to_thumbnails(files, sponsor_text) + channel.validate_access_token() + api = channel.get_api_wrapper() api_post_id = api.add_status(message, image, files) diff --git a/scripts/get-threads-keys.py b/scripts/get-threads-keys.py index d8587856..37f27f8d 100644 --- a/scripts/get-threads-keys.py +++ b/scripts/get-threads-keys.py @@ -1,5 +1,9 @@ +import json +from datetime import datetime, timedelta, timezone + import environ import requests +from redis import Redis env = environ.FileAwareEnv() @@ -14,6 +18,11 @@ USER_INFO_BASE_URL = "https://graph.threads.net/v1.0" +REDIS_HOST = env("REDIS_HOST", default="redis://bc2-redis") +REDIS_PORT = env("REDIS_PORT", default=6379) + +r = Redis.from_url(url=f"{REDIS_HOST}:{REDIS_PORT}", db=1) + def main(): if not APP_ID or not APP_SECRET: @@ -88,10 +97,29 @@ def main(): print(f"Account id: {user_id}") print("Enable: True") print(f"Access Token: {long_access_token}") - if expires_in is not None: + if expires_in is None: + print("Could not retrieve expiration time for access token") + return + + # Set expiration date in cache so we can refresh the token automatically + delay = timedelta(seconds=expires_in - 20) + expiration_date = (datetime.now(timezone.utc) + delay).isoformat() + expiration_key = f"threads_token_expiration_{user_id}" + print( + f"\nNote: Token will expire on {expiration_date} unless refreshed.\n" + ) + + try: + r.set( + expiration_key, + expiration_date, + ex=expires_in, + ) print( - f"\nNote: Token will expire in {expires_in / 86400:.1f} days unless refreshed." + f"Expiration date saved in cache as {expiration_key} for {delay} seconds." ) + except Exception as e: + print(f"Could not set expiration date in cache:\n{e}") if __name__ == "__main__": From ef3f923bd1710fa835e701006c7fb6e15910de5f Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Fri, 8 Nov 2024 09:12:00 -0300 Subject: [PATCH 05/12] feat(threads_script): Add timeouts to requests --- scripts/get-threads-keys.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/scripts/get-threads-keys.py b/scripts/get-threads-keys.py index 37f27f8d..0929c832 100644 --- a/scripts/get-threads-keys.py +++ b/scripts/get-threads-keys.py @@ -50,6 +50,7 @@ def main(): "grant_type": "authorization_code", "redirect_uri": THREADS_CALLBACK, }, + timeout=10, ) if response.status_code != 200: @@ -67,7 +68,11 @@ def main(): "client_secret": APP_SECRET, "access_token": short_lived_access_token, } - response = requests.get(LONG_LIVED_ACCESS_TOKEN_URL, params=params) + response = requests.get( + LONG_LIVED_ACCESS_TOKEN_URL, + params=params, + timeout=10, + ) if response.status_code != 200: raise Exception( @@ -79,7 +84,10 @@ def main(): expires_in = long_lived_data.get("expires_in") user_info_url = f"{USER_INFO_BASE_URL}/{user_id}?fields=username&access_token={long_access_token}" - response = requests.get(user_info_url) + response = requests.get( + user_info_url, + timeout=10, + ) if response.status_code != 200: raise Exception( From a16c9384c8c42fbff962504e54789f8d5696ea6a Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Fri, 8 Nov 2024 09:46:35 -0300 Subject: [PATCH 06/12] docs(Threads): Add docstrings to methods related to token validation --- bc/channel/models.py | 11 +++++++ bc/channel/utils/connectors/threads.py | 11 +++++++ .../utils/connectors/threads_api/client.py | 32 +++++++++++++++++-- 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/bc/channel/models.py b/bc/channel/models.py index 974010e9..7cca6d3a 100644 --- a/bc/channel/models.py +++ b/bc/channel/models.py @@ -158,6 +158,12 @@ def self_url(self): ) def validate_access_token(self): + """ + Validates and refreshes the access token for the channel if necessary. + + This method implements a locking mechanism to avoid multiple tasks + from concurrently trying to validate the same token. + """ if self.service not in self.CHANNELS_TO_REFRESH: return r = make_redis_interface("CACHE") @@ -165,7 +171,9 @@ def validate_access_token(self): lock = r.lock(lock_key, sleep=1, timeout=60) blocking_timeout = 60 try: + # Use a blocking lock to wait until locking task is finished lock.acquire(blocking=True, blocking_timeout=blocking_timeout) + # Then perform action to validate self._refresh_access_token() except LockError as e: logger.error( @@ -194,6 +202,9 @@ def _refresh_access_token(self): ) def _get_refresh_lock_key(self): + """ + Constructs the Redis key used for locking during access token refresh. + """ return f"token_refresh_lock_{self.account_id}@{self.get_service_display()}" def __str__(self) -> str: diff --git a/bc/channel/utils/connectors/threads.py b/bc/channel/utils/connectors/threads.py index ce0d8b48..385f3f3f 100644 --- a/bc/channel/utils/connectors/threads.py +++ b/bc/channel/utils/connectors/threads.py @@ -33,6 +33,17 @@ def get_api_object(self, _version=None) -> ThreadsAPI: return api def validate_access_token(self) -> tuple[bool, str]: + """ + Ensures that the access token used by the connector is up-to-date. + + This method delegates the validation of the access token to the underlying + `ThreadsAPI` instance by checking the access token's expiration date and + refreshing it if necessary. + + Returns: + tuple[bool, str]: A tuple where the first element is a boolean + indicating whether the token was refreshed, and the second element is the current access token. + """ return self.api.validate_access_token() def upload_media(self, media: bytes, _alt_text=None) -> str: diff --git a/bc/channel/utils/connectors/threads_api/client.py b/bc/channel/utils/connectors/threads_api/client.py index efb75c7a..52320291 100644 --- a/bc/channel/utils/connectors/threads_api/client.py +++ b/bc/channel/utils/connectors/threads_api/client.py @@ -167,6 +167,17 @@ def attempt_post( return response def validate_access_token(self) -> tuple[bool, str]: + """ + Validates the current access token and refreshes it if necessary. + + This method checks the expiration date of the access token stored in the Redis cache. + If the expiration date is missing, expired, or will expire within two days, + it attempts to refresh the token by calling `refresh_access_token`. + + Returns: + tuple[bool, str]: A tuple where the first element is a boolean + indicating whether the token was refreshed, and the second element is the current access token. + """ r = make_redis_interface("CACHE") refreshed = False @@ -192,6 +203,14 @@ def validate_access_token(self) -> tuple[bool, str]: return refreshed, self._access_token def refresh_access_token(self) -> bool: + """ + Refreshes the access token by making a request to the Threads API. + + If the refresh is successful, it updates the access token and its expiration date in the cache. + + Returns: + bool: `True` if the access token was successfully refreshed and updated; `False` otherwise. + """ refresh_access_token_url = ( "https://graph.threads.net/refresh_access_token" ) @@ -231,6 +250,12 @@ def refresh_access_token(self) -> bool: return True def _set_token_expiration_in_cache(self, expires_in: int): + """ + Stores the access token's expiration date in the Redis cache. + + Args: + expires_in (int): The number of seconds until the access token expires. + """ delay = timedelta(seconds=expires_in) expiration_date = (datetime.now(timezone.utc) + delay).isoformat() r = make_redis_interface("CACHE") @@ -239,12 +264,15 @@ def _set_token_expiration_in_cache(self, expires_in: int): r.set( key, expiration_date.encode("utf-8"), - ex=expires_in, + ex=expires_in, # ensure the cache entry expires when the token does ) except Exception as e: logger.error(f"Could not set {key} in cache:\n{e}") - def _get_expiration_key(self): + def _get_expiration_key(self) -> str: + """ + Returns the Redis key used for storing the access token's expiration date. + """ return f"threads_token_expiration_{self._account_id}" @staticmethod From fa9e7bf88e29b53313e176c48c22af3bfd9dc94f Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Fri, 8 Nov 2024 09:53:36 -0300 Subject: [PATCH 07/12] docs(Threads): Add docstrings to RefreshableBaseAPIConnector --- bc/channel/utils/connectors/base.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/bc/channel/utils/connectors/base.py b/bc/channel/utils/connectors/base.py index 99268912..195d7c2d 100644 --- a/bc/channel/utils/connectors/base.py +++ b/bc/channel/utils/connectors/base.py @@ -73,4 +73,17 @@ def add_status( class RefreshableBaseAPIConnector(BaseAPIConnector, Protocol): - def validate_access_token(self) -> tuple[bool, str]: ... + """ + Extends BaseAPIConnector to add logic to validate access tokens. + """ + + def validate_access_token(self) -> tuple[bool, str]: + """ + Validates the access token and refreshes it if necessary. + + Returns: + tuple[bool, str]: A tuple where the first element is a boolean + indicating if the token was refreshed, and the second element + is the current access token. + """ + ... From de734913a9b33088c65e6233b1ccb544d335097b Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Fri, 8 Nov 2024 10:00:21 -0300 Subject: [PATCH 08/12] refactor(channels): Remove unused code --- bc/channel/signals.py | 4 ---- bc/channel/tasks.py | 4 ---- 2 files changed, 8 deletions(-) diff --git a/bc/channel/signals.py b/bc/channel/signals.py index 2eabe3db..8080b1d4 100644 --- a/bc/channel/signals.py +++ b/bc/channel/signals.py @@ -1,5 +1,3 @@ -import logging - from django.conf import settings from django.db.models.signals import post_save from django.dispatch import receiver @@ -13,8 +11,6 @@ queue = get_queue("default") -logger = logging.getLogger(__name__) - @receiver(post_save, sender=Group) def group_handler(sender, instance=None, created=False, **kwargs): diff --git a/bc/channel/tasks.py b/bc/channel/tasks.py index fcbf1a22..e3bd9add 100644 --- a/bc/channel/tasks.py +++ b/bc/channel/tasks.py @@ -1,5 +1,3 @@ -import logging - from django.conf import settings from django_rq.queues import get_queue from rq import Retry @@ -8,8 +6,6 @@ queue = get_queue("default") -logger = logging.getLogger(__name__) - def enqueue_text_status_for_channel(channel: Channel, text: str) -> None: """ From c73ba9d612d74cd565571263031cd55cbaade3c8 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Fri, 8 Nov 2024 10:04:32 -0300 Subject: [PATCH 09/12] feat(channels): Validate access token before posting text status --- bc/channel/tasks.py | 1 + 1 file changed, 1 insertion(+) diff --git a/bc/channel/tasks.py b/bc/channel/tasks.py index e3bd9add..9ec1c2f4 100644 --- a/bc/channel/tasks.py +++ b/bc/channel/tasks.py @@ -16,6 +16,7 @@ def enqueue_text_status_for_channel(channel: Channel, text: str) -> None: channel (Channel): The channel object. text (str): Message for the new status. """ + channel.validate_access_token() api = channel.get_api_wrapper() queue.enqueue( api.add_status, From 8c785fef41d0c2b110144523074e7768ca7eca98 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Wed, 13 Nov 2024 12:46:41 -0300 Subject: [PATCH 10/12] refactor(channels): Invert condition to use early return and remove a level of indentation --- bc/channel/models.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/bc/channel/models.py b/bc/channel/models.py index 7cca6d3a..ad0caf9c 100644 --- a/bc/channel/models.py +++ b/bc/channel/models.py @@ -181,13 +181,12 @@ def validate_access_token(self): ) raise e finally: - if lock.owned(): - try: - lock.release() - except Exception as e: - logger.error( - f"Error releasing lock for channel {self}:\n{e}" - ) + if not lock.owned(): + return + try: + lock.release() + except Exception as e: + logger.error(f"Error releasing lock for channel {self}:\n{e}") def _refresh_access_token(self): api = self.get_api_wrapper() From 25d5f231f34e14d3e0d708c77944f2726b1e5d43 Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Wed, 13 Nov 2024 20:12:56 -0300 Subject: [PATCH 11/12] refactor(ThreadsAPI): Avoid instantiating a new Redis instance each time it's needed --- bc/channel/utils/connectors/threads_api/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bc/channel/utils/connectors/threads_api/client.py b/bc/channel/utils/connectors/threads_api/client.py index 52320291..e48c44e8 100644 --- a/bc/channel/utils/connectors/threads_api/client.py +++ b/bc/channel/utils/connectors/threads_api/client.py @@ -14,6 +14,8 @@ _BASE_API_URL = "https://graph.threads.net/v1.0" +r = make_redis_interface("CACHE") + class ThreadsAPI: """ @@ -178,7 +180,6 @@ def validate_access_token(self) -> tuple[bool, str]: tuple[bool, str]: A tuple where the first element is a boolean indicating whether the token was refreshed, and the second element is the current access token. """ - r = make_redis_interface("CACHE") refreshed = False try: @@ -258,7 +259,6 @@ def _set_token_expiration_in_cache(self, expires_in: int): """ delay = timedelta(seconds=expires_in) expiration_date = (datetime.now(timezone.utc) + delay).isoformat() - r = make_redis_interface("CACHE") key = self._get_expiration_key() try: r.set( From 523b160da5594703790b17a092e54d9e39d7db2a Mon Sep 17 00:00:00 2001 From: Elisa Anguita Date: Thu, 14 Nov 2024 13:44:12 -0300 Subject: [PATCH 12/12] refactor(channel_models): Avoid instantiating a new Redis instance each time it's needed --- bc/channel/models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bc/channel/models.py b/bc/channel/models.py index ad0caf9c..14913ec3 100644 --- a/bc/channel/models.py +++ b/bc/channel/models.py @@ -21,6 +21,8 @@ logger = logging.getLogger(__name__) +r = make_redis_interface("CACHE") + class Group(AbstractDateTimeModel): name = models.CharField( @@ -166,7 +168,6 @@ def validate_access_token(self): """ if self.service not in self.CHANNELS_TO_REFRESH: return - r = make_redis_interface("CACHE") lock_key = self._get_refresh_lock_key() lock = r.lock(lock_key, sleep=1, timeout=60) blocking_timeout = 60