Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Threads): Refresh Threads access tokens automatically. #629

Merged
merged 13 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions bc/channel/signals.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import logging
from datetime import timedelta

from django.conf import settings
from django.db.models.signals import post_save
from django.dispatch import receiver
Expand All @@ -8,9 +11,12 @@
from bc.core.utils.cloudfront import create_cache_invalidation

from .models import Channel, Group
from .tasks import refresh_threads_access_token

queue = get_queue("default")

logger = logging.getLogger(__name__)


@receiver(post_save, sender=Group)
def group_handler(sender, instance=None, created=False, **kwargs):
Expand Down Expand Up @@ -49,3 +55,18 @@ def channel_handler(sender, instance=None, created=False, **kwargs):
interval=settings.RQ_RETRY_INTERVAL,
),
)

# Schedule initial token refresh 2 days after creation
if created and instance.service == Channel.THREADS:
queue.enqueue_in(
timedelta(days=2),
refresh_threads_access_token,
channel_pk=instance.pk,
retry=Retry(
max=settings.RQ_MAX_NUMBER_OF_RETRIES,
interval=settings.RQ_RETRY_INTERVAL,
),
)
logger.info(
f"Scheduled new refresh token for newly created channel {instance} in 2 days"
)
71 changes: 71 additions & 0 deletions bc/channel/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import logging
from datetime import timedelta

import requests
from django.conf import settings
from django_rq.queues import get_queue
from rq import Retry
Expand All @@ -6,6 +10,8 @@

queue = get_queue("default")

logger = logging.getLogger(__name__)


def enqueue_text_status_for_channel(channel: Channel, text: str) -> None:
"""
Expand Down Expand Up @@ -38,3 +44,68 @@ def enqueue_text_status_for_group(group: Group, text: str) -> None:
"""
for channel in group.channels.all():
enqueue_text_status_for_channel(channel, text)


def refresh_threads_access_token(channel_pk):
"""
Task to refresh the access token for a Threads channel.
"""
try:
channel = Channel.objects.get(pk=channel_pk)
except Channel.DoesNotExist:
logger.warning(
f"Trying to refresh Threads access token for channel {channel_pk} but it no longer exists."
)
return

if channel.service != Channel.THREADS:
logger.warning(
f"Trying to refresh Threads access token for a {channel.get_service_display()} channel. Aborting."
)
return

refresh_access_token_url = "https://graph.threads.net/refresh_access_token"
params = {
"grant_type": "th_refresh_token",
"access_token": channel.access_token,
}
response = requests.get(refresh_access_token_url, params=params)
mlissner marked this conversation as resolved.
Show resolved Hide resolved

if response.status_code != 200:
logger.error(
f"Failed to refresh access token for Threads channel {channel}:"
f" {response.status_code} {response.text}"
)
return

data = response.json()
new_access_token = data.get("access_token")
expires_in = data.get("expires_in") # In seconds

if new_access_token is None or expires_in is None:
logger.error(
f"Missing 'access_token' or 'expires_in' in refresh access token response for Threads channel {channel}: {data}\n"
f"If the issue persists, a new access token can be retrieved manually with the script again."
)
return

channel.access_token = new_access_token
channel.save()

# Schedule the next token refresh
delay_seconds = (
expires_in - 86400
) # Subtract one day to avoid expiration before the task runs
elisa-a-v marked this conversation as resolved.
Show resolved Hide resolved
queue.enqueue_in(
timedelta(seconds=delay_seconds if delay_seconds > 0 else expires_in),
refresh_threads_access_token,
channel_pk=channel.pk,
retry=Retry(
max=settings.RQ_MAX_NUMBER_OF_RETRIES,
interval=settings.RQ_RETRY_INTERVAL,
),
)
mlissner marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
f"Scheduled new refresh token for Threads channel {channel}"
f" in {delay_seconds} seconds ({expires_in / 86400:.1f} days)"
)