Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gmail Connector Robustify #3000

Merged
merged 6 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,16 @@ def datetime_to_utc(dt: datetime) -> datetime:


def time_str_to_utc(datetime_str: str) -> datetime:
dt = parse(datetime_str)
try:
dt = parse(datetime_str)
except ValueError:
# Handle malformed timezone by attempting to fix common format issues
if "0000" in datetime_str:
# Convert "0000" to "+0000" for proper timezone parsing
fixed_dt_str = datetime_str.replace(" 0000", " +0000")
dt = parse(fixed_dt_str)
else:
raise
return datetime_to_utc(dt)


Expand Down
70 changes: 66 additions & 4 deletions backend/danswer/connectors/gmail/connector.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import re
import time
from base64 import urlsafe_b64decode
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
from typing import Dict

from google.oauth2.credentials import Credentials as OAuthCredentials # type: ignore
from google.oauth2.service_account import Credentials as ServiceAccountCredentials # type: ignore
from googleapiclient import discovery # type: ignore
from googleapiclient.errors import HttpError # type: ignore

from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
Expand Down Expand Up @@ -34,6 +39,64 @@
logger = setup_logger()


def _execute_with_retry(request: Any) -> Any:
max_attempts = 10
attempt = 0

while attempt < max_attempts:
# Note for reasons unknown, the Google API will sometimes return a 429
# and even after waiting the retry period, it will return another 429.
# It could be due to a few possibilities:
# 1. Other things are also requesting from the Gmail API with the same key
# 2. It's a rolling rate limit so the moment we get some amount of requests cleared, we hit it again very quickly
# 3. The retry-after has a maximum and we've already hit the limit for the day
# or it's something else...
try:
return request.execute()
except HttpError as error:
attempt += 1

if error.resp.status == 429:
# Attempt to get 'Retry-After' from headers
retry_after = error.resp.get("Retry-After")
if retry_after:
sleep_time = int(retry_after)
else:
# Extract 'Retry after' timestamp from error message
match = re.search(
r"Retry after (\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z)",
str(error),
)
if match:
retry_after_timestamp = match.group(1)
retry_after_dt = datetime.strptime(
retry_after_timestamp, "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(tzinfo=timezone.utc)
current_time = datetime.now(timezone.utc)
sleep_time = max(
int((retry_after_dt - current_time).total_seconds()),
0,
)
else:
logger.error(
f"No Retry-After header or timestamp found in error message: {error}"
)
sleep_time = 60

sleep_time += 3 # Add a buffer to be safe

logger.info(
f"Rate limit exceeded. Attempt {attempt}/{max_attempts}. Sleeping for {sleep_time} seconds."
)
time.sleep(sleep_time)

else:
raise

# If we've exhausted all attempts
raise Exception(f"Failed to execute request after {max_attempts} attempts")


class GmailConnector(LoadConnector, PollConnector):
def __init__(self, batch_size: int = INDEX_BATCH_SIZE) -> None:
self.batch_size = batch_size
Expand Down Expand Up @@ -156,7 +219,7 @@ def _fetch_mails_from_gmail(
query = GmailConnector._build_time_range_query(time_range_start, time_range_end)
service = discovery.build("gmail", "v1", credentials=self.creds)
while page_token is not None:
result = (
result = _execute_with_retry(
service.users()
.messages()
.list(
Expand All @@ -165,18 +228,17 @@ def _fetch_mails_from_gmail(
q=query,
maxResults=self.batch_size,
)
.execute()
)

page_token = result.get("nextPageToken")
messages = result.get("messages", [])
doc_batch = []
for message in messages:
message_id = message["id"]
msg = (
msg = _execute_with_retry(
service.users()
.messages()
.get(userId="me", id=message_id, format="full")
.execute()
)
doc = self._email_to_document(msg)
doc_batch.append(doc)
Expand Down
Loading