Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Django app #2663

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ COPY --chown=opener . /api/
# Collect static assets, these are used by the next stage, `nginx`
RUN env \
SETUP_ES="False" \
STATIC_ROOT="/static" \
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now correctly configured in settings for local and production deployments (which are intercepted by Nginx anyway) so there's no need to repeat the configuration here.

DJANGO_SECRET_KEY="any string" \
python manage.py collectstatic

Expand All @@ -117,8 +116,8 @@ EXPOSE 8000 3000
# Wait for ES to accept connections
ENTRYPOINT ["./run.sh"]

# Run Django dev server, can be overridden from Docker Compose
CMD ["python", "manage.py", "runserver", "0.0.0.0:8000"]
# Production should run gunicorn
CMD ["gunicorn"]

#########
# NGINX #
Expand Down
12 changes: 8 additions & 4 deletions api/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ pook = "~=1.0"
pgcli = "~=3.5"
freezegun = "~=1.2.2"
pytest-sugar = "~=0.9"
pytest-asyncio = "~=0.21.1"

[packages]
aiohttp = "~=3.8"
aws-requests-auth = "~=0.4"
boto3 = "~=1.26"
deepdiff = "~=6.2"
Django = "~=4.2"
django-async-redis = "~=0.2"
django-braces = "~=1.15"
django-cors-headers = "~=4.1"
django-cron = "~=0.6"
Expand All @@ -37,8 +39,6 @@ djangorestframework = "~=3.14"
drf-spectacular = "*"
elasticsearch-dsl = "~=7.4"
future = "~=0.18"
gevent = "~=22.10"
gunicorn = "~=20.1"
hvac = "~=1.0"
ipaddress = "~=1.0"
limit = "~=0.2"
Expand All @@ -50,8 +50,12 @@ python3-openid = "~=3.2"
redlock-py = "~=1.0"
requests-oauthlib = "~=1.3"
sentry-sdk = "~=1.21"
wsgi-basic-auth = "~=1.1"
django-split-settings = "*"
django-split-settings = "~=1.2.0"
adrf = "~=0.1.1"
uvicorn = "~=0.23.1"
gunicorn = "~=20.1"
asgiref = "~=3.7.2"
httpx = "~=0.24"

[requires]
python_version = "3.11"
338 changes: 117 additions & 221 deletions api/Pipfile.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions api/api/controllers/search_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from api.constants.sorting import INDEXED_ON
from api.serializers import media_serializers
from api.utils import tallies
from api.utils.check_dead_links import check_dead_links
from api.utils.check_dead_links import sync_check_dead_links
from api.utils.dead_link_mask import get_query_hash, get_query_mask
from api.utils.search_context import SearchContext

Expand Down Expand Up @@ -177,7 +177,7 @@ def _post_process_results(

if filter_dead:
query_hash = get_query_hash(s)
check_dead_links(query_hash, start, results, to_validate)
sync_check_dead_links(query_hash, start, results, to_validate)

if len(results) == 0:
# first page is all dead links
Expand Down
30 changes: 30 additions & 0 deletions api/api/utils/aiohttp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Safely share aiohttp ClientSession for the entire application.

CC BY-SA 4.0 by StackOverflow user https://stackoverflow.com/users/8601760/aaron
https://stackoverflow.com/a/72634224
"""

import asyncio

import aiohttp

from conf.asgi import application


# aiohttp recommends reusing the same session for the whole application
# https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
_SESSION: aiohttp.ClientSession = None

_lock = asyncio.Lock()


async def get_aiohttp_session():
global _SESSION

async with _lock:
if not _SESSION or _SESSION.closed:
_SESSION = aiohttp.ClientSession()
application.on_shutdown.append(_SESSION.close)

return _SESSION
24 changes: 14 additions & 10 deletions api/api/utils/check_dead_links/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from decouple import config
from elasticsearch_dsl.response import Hit

from api.utils.aiohttp import get_aiohttp_session
from api.utils.check_dead_links.provider_status_mappings import provider_status_mappings
from api.utils.dead_link_mask import get_query_mask, save_query_mask

Expand All @@ -32,28 +33,28 @@ def _get_expiry(status, default):
return config(f"LINK_VALIDATION_CACHE_EXPIRY__{status}", default=default, cast=int)


async def _head(url: str, session: aiohttp.ClientSession) -> tuple[str, int]:
async def _head(url: str, **kwargs) -> tuple[str, int]:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this module are just to support the reusable aiohttp session. We should have been doing that already anyway.

session = await get_aiohttp_session()
try:
async with session.head(url, allow_redirects=False) as response:
async with session.head(
url, allow_redirects=False, headers=HEADERS, **kwargs
) as response:
return url, response.status
except (aiohttp.ClientError, asyncio.TimeoutError) as exception:
_log_validation_failure(exception)
return url, -1


# https://stackoverflow.com/q/55259755
@async_to_sync
async def _make_head_requests(urls: list[str]) -> list[tuple[str, int]]:
tasks = []
timeout = aiohttp.ClientTimeout(total=2)
async with aiohttp.ClientSession(headers=HEADERS, timeout=timeout) as session:
tasks = [asyncio.ensure_future(_head(url, session)) for url in urls]
responses = asyncio.gather(*tasks)
await responses
tasks = [asyncio.ensure_future(_head(url, timeout=2)) for url in urls]
responses = asyncio.gather(*tasks)
await responses
return responses.result()


def check_dead_links(
async def check_dead_links(
query_hash: str, start_slice: int, results: list[Hit], image_urls: list[str]
) -> None:
"""
Expand Down Expand Up @@ -85,7 +86,7 @@ def check_dead_links(
to_verify[url] = idx
logger.debug(f"len(to_verify)={len(to_verify)}")

verified = _make_head_requests(to_verify.keys())
verified = await _make_head_requests(to_verify.keys())

# Cache newly verified image statuses.
to_cache = {CACHE_PREFIX + url: status for url, status in verified}
Expand Down Expand Up @@ -161,6 +162,9 @@ def check_dead_links(
)


sync_check_dead_links = async_to_sync(check_dead_links)


def _log_validation_failure(exception):
logger = parent_logger.getChild("_log_validation_failure")
logger.warning(f"Failed to validate image! Reason: {exception}")
37 changes: 24 additions & 13 deletions api/api/utils/image_proxy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@
from django.http import HttpResponse
from rest_framework.exceptions import UnsupportedMediaType

import aiohttp
import django_async_redis
import django_redis
import requests
import sentry_sdk
from asgiref.sync import sync_to_async
from sentry_sdk import push_scope, set_context

from api.utils.aiohttp import get_aiohttp_session
from api.utils.image_proxy.exception import UpstreamThumbnailException
from api.utils.image_proxy.extension import get_image_extension
from api.utils.image_proxy.photon import get_photon_request_params
Expand Down Expand Up @@ -58,7 +62,7 @@ def get_request_params_for_extension(
)


def get(
async def get(
image_url: str,
media_identifier: str,
accept_header: str = "image/*",
Expand All @@ -70,10 +74,11 @@ def get(
original image if the file type is SVG. Otherwise, raise an exception.
"""
logger = parent_logger.getChild("get")
tallies = django_redis.get_redis_connection("tallies")
tallies = await django_async_redis.get_redis_connection("atallies")

month = get_monthly_timestamp()

image_extension = get_image_extension(image_url, media_identifier)
image_extension = await get_image_extension(image_url, media_identifier)

headers = {"Accept": accept_header} | HEADERS

Expand All @@ -90,22 +95,28 @@ def get(
)

try:
upstream_response = requests.get(
session = await get_aiohttp_session()
upstream_response = await session.get(
upstream_url,
timeout=15,
params=params,
headers=headers,
timeout=15,
)
await tallies.incr(
f"thumbnail_response_code:{month}:{upstream_response.status}"
)
tallies.incr(f"thumbnail_response_code:{month}:{upstream_response.status_code}")
tallies.incr(
await tallies.incr(
f"thumbnail_response_code_by_domain:{domain}:"
f"{month}:{upstream_response.status_code}"
f"{month}:{upstream_response.status}"
)
upstream_response.raise_for_status()
body = await upstream_response.read()
except Exception as exc:
# REMOVE THIS LINE. JUST FOR TESTING TO SEE THE REAL EXCEPTION
raise exc
exception_name = f"{exc.__class__.__module__}.{exc.__class__.__name__}"
key = f"thumbnail_error:{exception_name}:{domain}:{month}"
count = tallies.incr(key)
count = await tallies.incr(key)
if count <= settings.THUMBNAIL_ERROR_INITIAL_ALERT_THRESHOLD or (
count % settings.THUMBNAIL_ERROR_REPEATED_ALERT_FREQUENCY == 0
):
Expand All @@ -121,21 +132,21 @@ def get(
scope.set_tag(
"occurrences", settings.THUMBNAIL_ERROR_REPEATED_ALERT_FREQUENCY
)
sentry_sdk.capture_exception(exc)
sentry_sdk.capture_exception(exc)
if isinstance(exc, requests.exceptions.HTTPError):
tallies.incr(
await tallies.incr(
f"thumbnail_http_error:{domain}:{month}:{exc.response.status_code}:{exc.response.text}"
)
raise UpstreamThumbnailException(f"Failed to render thumbnail. {exc}")

res_status = upstream_response.status_code
res_status = upstream_response.status
content_type = upstream_response.headers.get("Content-Type")
logger.debug(
f"Image proxy response status: {res_status}, content-type: {content_type}"
)

return HttpResponse(
upstream_response.content,
body,
status=res_status,
content_type=content_type,
)
25 changes: 16 additions & 9 deletions api/api/utils/image_proxy/extension.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
from os.path import splitext
from urllib.parse import urlparse

import django_redis
import requests
import django_async_redis
import sentry_sdk
from asgiref.sync import sync_to_async

from api.utils.aiohttp import get_aiohttp_session
from api.utils.image_proxy.exception import UpstreamThumbnailException


def get_image_extension(image_url: str, media_identifier: str) -> str | None:
cache = django_redis.get_redis_connection("default")
async def get_image_extension(image_url: str, media_identifier: str) -> str | None:
"""
Retrieve image extension from host or cache.

Does not use async Redis client due to issues with `django-async-redis`
incorrectly closing the event loop during the request lifecycle.
"""
cache = await django_async_redis.get_redis_connection("adefault")
key = f"media:{media_identifier}:thumb_type"

ext = _get_file_extension_from_url(image_url)

if not ext:
# If the extension is not present in the URL, try to get it from the redis cache
ext = cache.get(key)
ext = await sync_to_async(cache.get)(key)
ext = ext.decode("utf-8") if ext else None

if not ext:
# If the extension is still not present, try getting it from the content type
try:
response = requests.head(image_url, timeout=10)
response.raise_for_status()
session = await get_aiohttp_session()
async with session.head(image_url, timeout=10) as response:
response.raise_for_status()
except Exception as exc:
sentry_sdk.capture_exception(exc)
raise UpstreamThumbnailException(
Expand All @@ -37,7 +44,7 @@ def get_image_extension(image_url: str, media_identifier: str) -> str | None:
else:
ext = None

cache.set(key, ext if ext else "unknown")
await sync_to_async(cache.set)(key, ext if ext else "unknown")
return ext


Expand Down
Loading