diff --git a/backend/danswer/connectors/cross_connector_utils/miscellaneous_utils.py b/backend/danswer/connectors/cross_connector_utils/miscellaneous_utils.py index 1b79c4a4faf..8e8ea8d7d65 100644 --- a/backend/danswer/connectors/cross_connector_utils/miscellaneous_utils.py +++ b/backend/danswer/connectors/cross_connector_utils/miscellaneous_utils.py @@ -23,7 +23,16 @@ def datetime_to_utc(dt: datetime) -> datetime: def time_str_to_utc(datetime_str: str) -> datetime: - dt = parse(datetime_str) + try: + dt = parse(datetime_str) + except ValueError: + # Handle malformed timezone by attempting to fix common format issues + if "0000" in datetime_str: + # Convert "0000" to "+0000" for proper timezone parsing + fixed_dt_str = datetime_str.replace(" 0000", " +0000") + dt = parse(fixed_dt_str) + else: + raise return datetime_to_utc(dt) diff --git a/backend/danswer/connectors/gmail/connector.py b/backend/danswer/connectors/gmail/connector.py index 42d2f305f73..376f03e71d0 100644 --- a/backend/danswer/connectors/gmail/connector.py +++ b/backend/danswer/connectors/gmail/connector.py @@ -1,4 +1,8 @@ +import re +import time from base64 import urlsafe_b64decode +from datetime import datetime +from datetime import timezone from typing import Any from typing import cast from typing import Dict @@ -6,6 +10,7 @@ from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore from googleapiclient import discovery # type: ignore +from googleapiclient.errors import HttpError # type: ignore from danswer.configs.app_configs import INDEX_BATCH_SIZE from danswer.configs.constants import DocumentSource @@ -34,6 +39,64 @@ logger = setup_logger() +def _execute_with_retry(request: Any) -> Any: + max_attempts = 10 + attempt = 0 + + while attempt < max_attempts: + # Note for reasons unknown, the Google API will sometimes return a 429 + # and even after waiting the retry period, it will return another 429. + # It could be due to a few possibilities: + # 1. Other things are also requesting from the Gmail API with the same key + # 2. It's a rolling rate limit so the moment we get some amount of requests cleared, we hit it again very quickly + # 3. The retry-after has a maximum and we've already hit the limit for the day + # or it's something else... + try: + return request.execute() + except HttpError as error: + attempt += 1 + + if error.resp.status == 429: + # Attempt to get 'Retry-After' from headers + retry_after = error.resp.get("Retry-After") + if retry_after: + sleep_time = int(retry_after) + else: + # Extract 'Retry after' timestamp from error message + match = re.search( + r"Retry after (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)", + str(error), + ) + if match: + retry_after_timestamp = match.group(1) + retry_after_dt = datetime.strptime( + retry_after_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ" + ).replace(tzinfo=timezone.utc) + current_time = datetime.now(timezone.utc) + sleep_time = max( + int((retry_after_dt - current_time).total_seconds()), + 0, + ) + else: + logger.error( + f"No Retry-After header or timestamp found in error message: {error}" + ) + sleep_time = 60 + + sleep_time += 3 # Add a buffer to be safe + + logger.info( + f"Rate limit exceeded. Attempt {attempt}/{max_attempts}. Sleeping for {sleep_time} seconds." + ) + time.sleep(sleep_time) + + else: + raise + + # If we've exhausted all attempts + raise Exception(f"Failed to execute request after {max_attempts} attempts") + + class GmailConnector(LoadConnector, PollConnector): def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None: self.batch_size = batch_size @@ -156,7 +219,7 @@ def _fetch_mails_from_gmail( query = GmailConnector._build_time_range_query(time_range_start, time_range_end) service = discovery.build("gmail", "v1", credentials=self.creds) while page_token is not None: - result = ( + result = _execute_with_retry( service.users() .messages() .list( @@ -165,18 +228,17 @@ def _fetch_mails_from_gmail( q=query, maxResults=self.batch_size, ) - .execute() ) + page_token = result.get("nextPageToken") messages = result.get("messages", []) doc_batch = [] for message in messages: message_id = message["id"] - msg = ( + msg = _execute_with_retry( service.users() .messages() .get(userId="me", id=message_id, format="full") - .execute() ) doc = self._email_to_document(msg) doc_batch.append(doc)