Skip to content

Commit

Permalink
fix(backend): Fix conn_retry decorator possible incorrect behaviour o…
Browse files Browse the repository at this point in the history
…n failed async function (#8836)

This fix is triggered by an error observed on db connection failure on
SupaBase:
```
2024-11-28 07:45:24,724 INFO  [DatabaseManager] Starting...
2024-11-28 07:45:24,726 INFO  [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection started...
2024-11-28 07:45:24,726 INFO  [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection completed successfully.
{"is_panic":false,"message":"Can't reach database server at `...pooler.supabase.com:5432`\n\nPlease make sure your database server is running at `....pooler.supabase.com:5432`.","meta":{"database_host":"...pooler.supabase.com","database_port":5432},"error_code":"P1001"}
2024-11-28 07:45:35,153 INFO  [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection failed: Could not connect to the query engine. Retrying now...
2024-11-28 07:45:36,155 INFO  [PID-18|DatabaseManager|Redis-e14a33de-2d81-4536-b48b-a8aa4b1f4766] Acquiring connection started...
2024-11-28 07:45:36,181 INFO  [PID-18|DatabaseManager|Redis-e14a33de-2d81-4536-b48b-a8aa4b1f4766] Acquiring connection completed successfully.
2024-11-28 07:45:36,183 INFO  [PID-18|DatabaseManager|Pyro-2722cd29-4dbd-4cf9-882f-73842658599d] Starting Pyro Service started...
2024-11-28 07:45:36,189 INFO  [DatabaseManager] Connected to Pyro; URI = PYRO:DatabaseManager@0.0.0.0:8005
2024-11-28 07:46:28,241 ERROR  Error in get_user_integrations: All connection attempts failed
```

Where  even 
```
2024-11-28 07:45:35,153 INFO  [PID-18|DatabaseManager|Prisma-7f32369c-6432-4edb-8e71-ef820332b9e4] Acquiring connection failed: Could not connect to the query engine. Retrying now...
```
is present, the Redis connection is still proceeding without waiting for
the retry to complete. This was likely caused by Tenacity not fully
awaiting the DB connection acquisition command.

### Changes 🏗️

* Add special handling for the async function to explicitly await the
function execution result on each retry.
* Explicitly raise exceptions on `db.connect()` if the db is not
connected even after `prisma.connect()` command.

### Checklist 📋

#### For code changes:
- [ ] I have clearly listed my changes in the PR description
- [ ] I have made a test plan
- [ ] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [ ] ...

<details>
  <summary>Example test plan</summary>
  
  - [ ] Create from scratch and execute an agent with at least 3 blocks
- [ ] Import an agent from file upload, and confirm it executes
correctly
  - [ ] Upload agent to marketplace
- [ ] Import an agent from marketplace and confirm it executes correctly
  - [ ] Edit an agent from monitor, and confirm it executes correctly
</details>

#### For configuration changes:
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my
changes
- [ ] I have included a list of my configuration changes in the PR
description (under **Changes**)

<details>
  <summary>Examples of configuration changes</summary>

  - Changing ports
  - Adding new services that need to communicate with each other
  - Secrets or environment variable changes
  - New or infrastructure changes such as databases
</details>
  • Loading branch information
majdyz authored Nov 29, 2024
1 parent 29f177e commit 63af42d
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 12 deletions.
8 changes: 8 additions & 0 deletions autogpt_platform/backend/backend/data/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@
async def connect():
if prisma.is_connected():
return

await prisma.connect()

if not prisma.is_connected():
raise ConnectionError("Failed to connect to Prisma.")


@conn_retry("Prisma", "Releasing connection")
async def disconnect():
if not prisma.is_connected():
return

await prisma.disconnect()

if prisma.is_connected():
raise ConnectionError("Failed to disconnect from Prisma.")


@asynccontextmanager
async def transaction():
Expand Down
44 changes: 32 additions & 12 deletions autogpt_platform/backend/backend/util/retry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import logging
import os
import threading
Expand All @@ -20,7 +21,14 @@ def _log_prefix(resource_name: str, conn_id: str):
return f"[PID-{os.getpid()}|THREAD-{threading.get_native_id()}|{get_service_name()}|{resource_name}-{conn_id}]"


def conn_retry(resource_name: str, action_name: str, max_retry: int = 5):
def conn_retry(
resource_name: str,
action_name: str,
max_retry: int = 5,
multiplier: int = 1,
min_wait: float = 1,
max_wait: float = 30,
):
conn_id = str(uuid4())

def on_retry(retry_state):
Expand All @@ -29,27 +37,39 @@ def on_retry(retry_state):
logger.error(f"{prefix} {action_name} failed: {exception}. Retrying now...")

def decorator(func):
is_coroutine = asyncio.iscoroutinefunction(func)
retry_decorator = retry(
stop=stop_after_attempt(max_retry + 1),
wait=wait_exponential(multiplier=multiplier, min=min_wait, max=max_wait),
before_sleep=on_retry,
reraise=True,
)
wrapped_func = retry_decorator(func)

@wraps(func)
def wrapper(*args, **kwargs):
def sync_wrapper(*args, **kwargs):
prefix = _log_prefix(resource_name, conn_id)
logger.info(f"{prefix} {action_name} started...")
try:
result = wrapped_func(*args, **kwargs)
logger.info(f"{prefix} {action_name} completed successfully.")
return result
except Exception as e:
logger.error(f"{prefix} {action_name} failed after retries: {e}")
raise

# Define the retrying strategy
retrying_func = retry(
stop=stop_after_attempt(max_retry + 1),
wait=wait_exponential(multiplier=1, min=1, max=30),
before_sleep=on_retry,
reraise=True,
)(func)

@wraps(func)
async def async_wrapper(*args, **kwargs):
prefix = _log_prefix(resource_name, conn_id)
logger.info(f"{prefix} {action_name} started...")
try:
result = retrying_func(*args, **kwargs)
result = await wrapped_func(*args, **kwargs)
logger.info(f"{prefix} {action_name} completed successfully.")
return result
except Exception as e:
logger.error(f"{prefix} {action_name} failed after retries: {e}")
raise

return wrapper
return async_wrapper if is_coroutine else sync_wrapper

return decorator
49 changes: 49 additions & 0 deletions autogpt_platform/backend/test/util/test_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import asyncio

import pytest

from backend.util.retry import conn_retry


def test_conn_retry_sync_function():
retry_count = 0

@conn_retry("Test", "Test function", max_retry=2, max_wait=0.1, min_wait=0.1)
def test_function():
nonlocal retry_count
retry_count -= 1
if retry_count > 0:
raise ValueError("Test error")
return "Success"

retry_count = 2
res = test_function()
assert res == "Success"

retry_count = 100
with pytest.raises(ValueError) as e:
test_function()
assert str(e.value) == "Test error"


@pytest.mark.asyncio
async def test_conn_retry_async_function():
retry_count = 0

@conn_retry("Test", "Test function", max_retry=2, max_wait=0.1, min_wait=0.1)
async def test_function():
nonlocal retry_count
await asyncio.sleep(1)
retry_count -= 1
if retry_count > 0:
raise ValueError("Test error")
return "Success"

retry_count = 2
res = await test_function()
assert res == "Success"

retry_count = 100
with pytest.raises(ValueError) as e:
await test_function()
assert str(e.value) == "Test error"

0 comments on commit 63af42d

Please sign in to comment.