Skip to content

Commit

Permalink
Merge pull request #812 from sabeechen/dev
Browse files Browse the repository at this point in the history
Release 0.110.2
  • Loading branch information
sabeechen committed Mar 24, 2023
2 parents 3b5936e + a46c0d6 commit dc294c7
Show file tree
Hide file tree
Showing 19 changed files with 532 additions and 127 deletions.
4 changes: 3 additions & 1 deletion .devcontainer/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ beautifulsoup4
firebase-admin
aiofile
grpcio
aioping
aioping
pytz
tzlocal
4 changes: 4 additions & 0 deletions hassio-google-drive-backup/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## v0.110.2 [2023-03-24]
- Fix a potential cause of SSL errors when communicating with Google Drive
- Fix a bug causing backups to be requested indefinitely if scheduled during DST transitions.

## v0.110.1 [2023-01-09]
- Adds some additional options for donating
- Mitgigates SD card corruption by redundantly storing config files needed for addon startup.
Expand Down
9 changes: 6 additions & 3 deletions hassio-google-drive-backup/backup/creds/driverequester.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from aiohttp import ClientSession, ContentTypeError, ClientConnectorError, ClientTimeout
from aiohttp import ClientSession, ContentTypeError, ClientConnectorError, ClientTimeout, ClientResponse
from aiohttp.client_exceptions import ServerTimeoutError, ServerDisconnectedError, ClientOSError
from backup.exceptions import GoogleUnexpectedError, GoogleInternalError, GoogleRateLimitError, GoogleCredentialsExpired, CredRefreshGoogleError, DriveQuotaExceeded, GoogleDrivePermissionDenied, GoogleDnsFailure, GoogleCantConnect, GoogleTimeoutError
from backup.util import Resolver
Expand All @@ -24,20 +24,23 @@ def __init__(self, config: Config, session: ClientSession, resolver: Resolver):
self.resolver = resolver
self.config = config

async def request(self, method, url, headers={}, json=None, data=None):
async def request(self, method, url, headers={}, json=None, data=None) -> ClientResponse:
try:
# MAYBE: Exceptions here should clean up the response object
response = await self.session.request(method, url, headers=headers, json=json, timeout=self.buildTimeout(), data=data)
if response.status < 400:
return response
await self.raiseForKnownErrors(response)
if response.status in PERMISSION_DENIED:
response.release()
raise GoogleCredentialsExpired()
elif response.status in INTERNAL_ERROR:
response.release()
raise GoogleInternalError()
elif response.status in RATE_LIMIT_EXCEEDED or response.status in TOO_MANY_REQUESTS:
response.release()
raise GoogleRateLimitError()
elif response.status in REQUEST_TIMEOUT:
response.release()
raise GoogleTimeoutError()
response.raise_for_status()
return response
Expand Down
13 changes: 2 additions & 11 deletions hassio-google-drive-backup/backup/creds/exchanger.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,8 @@ async def exchange(self, code):
KEY_GRANT_TYPE: 'authorization_code'
}
resp = None
try:
resp = await self.drive.request("post", self.config.get(Setting.DRIVE_TOKEN_URL), data=data)
async with await self.drive.request("post", self.config.get(Setting.DRIVE_TOKEN_URL), data=data) as resp:
return Creds.load(self.time, await resp.json(), id=self._client_id, secret=self._client_secret)
finally:
if resp is not None:
resp.release()

async def refresh(self, creds: Creds):
if creds.secret is not None:
Expand All @@ -91,9 +87,7 @@ async def _refresh_google(self, creds: Creds):
KEY_REFRESH_TOKEN: creds.refresh_token,
KEY_GRANT_TYPE: 'refresh_token'
}
resp = None
try:
resp = await self.drive.request("post", self.config.get(Setting.DRIVE_REFRESH_URL), data=data)
async with await self.drive.request("post", self.config.get(Setting.DRIVE_REFRESH_URL), data=data) as resp:
data = await resp.json()
return Creds(
self.time,
Expand All @@ -103,9 +97,6 @@ async def _refresh_google(self, creds: Creds):
refresh_token=creds.refresh_token,
expiration=self._get_expiration(data),
original_expiration=creds.original_expiration)
finally:
if resp is not None:
resp.release()

async def _refresh_default(self, creds: Creds):
data = {
Expand Down
111 changes: 51 additions & 60 deletions hassio-google-drive-backup/backup/drive/driverequests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from urllib.parse import urlencode
from datetime import datetime, timedelta

from aiohttp import ClientSession, ClientTimeout
from aiohttp import ClientSession, ClientTimeout, ClientResponse
from aiohttp.client_exceptions import ClientResponseError, ServerTimeoutError
from injector import inject, singleton

Expand Down Expand Up @@ -160,7 +160,8 @@ async def get(self, id):
"fields": SELECT_FIELDS,
"supportsAllDrives": "true"
}
return await self.retryRequest("GET", URL_FILES + id + "/?" + urlencode(q), is_json=True)
async with await self.retryRequest("GET", URL_FILES + id + "/?" + urlencode(q)) as response:
return await response.json()

async def download(self, id, size):
ret = AsyncHttpGetter(self.config.get(Setting.DRIVE_URL) + URL_FILES + id + "/?alt=media&supportsAllDrives=true",
Expand Down Expand Up @@ -189,27 +190,27 @@ async def query(self, query):
}
if continuation:
q["pageToken"] = continuation
response = await self.retryRequest("GET", URL_FILES + "?" + urlencode(q), is_json=True)
for item in response['files']:
yield item
if "nextPageToken" not in response or len(response['nextPageToken']) <= 0:
break
else:
continuation = response['nextPageToken']
async with await self.retryRequest("GET", URL_FILES + "?" + urlencode(q)) as response:
data = await response.json()
for item in data['files']:
yield item
if "nextPageToken" not in data or len(data['nextPageToken']) <= 0:
break
else:
continuation = data['nextPageToken']

async def update(self, id, update_metadata):
resp = await self.retryRequest("PATCH", URL_FILES + id + "/?supportsAllDrives=true", json=update_metadata)
async with resp:
async with await self.retryRequest("PATCH", URL_FILES + id + "/?supportsAllDrives=true", json=update_metadata):
pass

async def delete(self, id):
resp = await self.retryRequest("DELETE", URL_FILES + id + "/?supportsAllDrives=true")
async with resp:
async with await self.retryRequest("DELETE", URL_FILES + id + "/?supportsAllDrives=true"):
pass

async def getAboutInfo(self):
q = {"fields": 'storageQuota,user'}
return await self.retryRequest("GET", URL_ABOUT + "?" + urlencode(q), is_json=True)
async with await self.retryRequest("GET", URL_ABOUT + "?" + urlencode(q)) as resp:
return await resp.json()

async def create(self, stream, metadata, mime_type):
# Upload logic is complicated. See https://developers.google.com/drive/api/v3/manage-uploads#resumable
Expand All @@ -225,8 +226,7 @@ async def create(self, stream, metadata, mime_type):
"Content-Range": "bytes */{0}".format(total_size)
}
try:
initial = await self.retryRequest("PUT", self.last_attempt_location, headers=headers, patch_url=False)
async with initial:
async with await self.retryRequest("PUT", self.last_attempt_location, headers=headers, patch_url=False) as initial:
if initial.status == 308:
# We can resume the upload, check where it left off
if 'Range' in initial.headers:
Expand All @@ -244,7 +244,7 @@ async def create(self, stream, metadata, mime_type):
except ClientResponseError as e:
if e.status == 410:
# Drive doesn't recognize the resume token, so we'll just have to start over.
logger.debug("Drive upload session wasn't recognized, restarting upload form the beginning.")
logger.debug("Drive upload session wasn't recognized, restarting upload from the beginning.")
location = None
else:
raise
Expand All @@ -256,8 +256,7 @@ async def create(self, stream, metadata, mime_type):
"X-Upload-Content-Type": mime_type,
"X-Upload-Content-Length": str(total_size),
}
initial = await self.retryRequest("POST", URL_START_UPLOAD, headers=headers, json=metadata)
async with initial:
async with await self.retryRequest("POST", URL_START_UPLOAD, headers=headers, json=metadata) as initial:
# Google returns a url in the header "Location", which is where subsequent requests to upload
# the backup's bytes should be sent. Logic below handles uploading the file bytes in chunks.
location = ensureKey(
Expand All @@ -277,7 +276,7 @@ async def create(self, stream, metadata, mime_type):
current_chunk_size = BASE_CHUNK_SIZE
while True:
start = stream.position()
data: io.IOBaseBytesio = await stream.read(current_chunk_size)
data = await stream.read(current_chunk_size)
chunk_size = len(data.getbuffer())
if chunk_size == 0:
raise LogicError(
Expand All @@ -286,19 +285,35 @@ async def create(self, stream, metadata, mime_type):
"Content-Length": str(chunk_size),
"Content-Range": "bytes {0}-{1}/{2}".format(start, start + chunk_size - 1, total_size)
}
startTime = self.time.now()
logger.debug("Sending {0} bytes to Google Drive".format(current_chunk_size))
try:
startTime = self.time.now()
logger.debug("Sending {0} bytes to Google Drive".format(
current_chunk_size))
partial = await self.retryRequest("PUT", location, headers=headers, data=data, patch_url=False)

# Base the next chunk size on how long it took to send the last chunk.
current_chunk_size = self._getNextChunkSize(
current_chunk_size, (self.time.now() - startTime).total_seconds())

# any time a chunk gets uploaded, reset the retry counter. This lets very flaky connections
# complete eventually after enough retrying.
self.last_attempt_count = 1
async with await self.retryRequest("PUT", location, headers=headers, data=data, patch_url=False) as partial:
# Base the next chunk size on how long it took to send the last chunk.
current_chunk_size = self._getNextChunkSize(
current_chunk_size, (self.time.now() - startTime).total_seconds())

# any time a chunk gets uploaded, reset the retry counter. This lets very flaky connections
# complete eventually after enough retrying.
self.last_attempt_count = 1
yield float(start + chunk_size) / float(total_size)
if partial.status == 200 or partial.status == 201:
# Upload completed, return the object json
self.last_attempt_location = None
self.last_attempt_metadata = None
yield await self.get((await partial.json())['id'])
break
elif partial.status == 308:
# Upload partially complete, seek to the new requested position
range_bytes = ensureKey(
"Range", partial.headers, "Google Drive's upload response headers")
if not RANGE_RE.match(range_bytes):
raise ProtocolError(
"Range", partial.headers, "Google Drive's upload response headers")
position = int(partial.headers["Range"][len("bytes=0-"):])
stream.position(position + 1)
else:
partial.raise_for_status()
except ClientResponseError as e:
if math.floor(e.status / 100) == 4:
# clear the cached session location URI, since a 4XX error
Expand All @@ -310,25 +325,6 @@ async def create(self, stream, metadata, mime_type):
raise GoogleSessionError()
else:
raise e
yield float(start + chunk_size) / float(total_size)
if partial.status == 200 or partial.status == 201:
# Upload completed, return the object json
self.last_attempt_location = None
self.last_attempt_metadata = None
yield await self.get((await partial.json())['id'])
break
elif partial.status == 308:
# Upload partially complete, seek to the new requested position
range_bytes = ensureKey(
"Range", partial.headers, "Google Drive's upload response headers")
if not RANGE_RE.match(range_bytes):
raise ProtocolError(
"Range", partial.headers, "Google Drive's upload response headers")
position = int(partial.headers["Range"][len("bytes=0-"):])
stream.position(position + 1)
else:
partial.raise_for_status()
await partial.release()

def _getNextChunkSize(self, last_chunk_size, last_chunk_seconds):
max = BASE_CHUNK_SIZE * math.floor(self.config.get(Setting.MAXIMUM_UPLOAD_CHUNK_BYTES) / BASE_CHUNK_SIZE)
Expand All @@ -344,9 +340,10 @@ def _getNextChunkSize(self, last_chunk_size, last_chunk_seconds):
return math.floor(next_chunk / BASE_CHUNK_SIZE) * BASE_CHUNK_SIZE

async def createFolder(self, metadata):
return await self.retryRequest("POST", URL_FILES + "?supportsAllDrives=true", is_json=True, json=metadata)
async with await self.retryRequest("POST", URL_FILES + "?supportsAllDrives=true", json=metadata) as resp:
return await resp.json()

async def retryRequest(self, method, url, auth_headers: Optional[Dict[str, str]] = None, headers: Optional[Dict[str, str]] = None, json: Optional[Dict[str, Any]] = None, data: Any = None, is_json: bool = False, cred_retry: bool = True, patch_url: bool = True):
async def retryRequest(self, method, url, auth_headers: Optional[Dict[str, str]] = None, headers: Optional[Dict[str, str]] = None, json: Optional[Dict[str, Any]] = None, data: Any = None, cred_retry: bool = True, patch_url: bool = True) -> ClientResponse:
backoff = Backoff(base=DRIVE_RETRY_INITIAL_SECONDS, attempts=DRIVE_MAX_RETRIES)
if patch_url:
url = self.config.get(Setting.DRIVE_URL) + url
Expand All @@ -363,13 +360,7 @@ async def retryRequest(self, method, url, auth_headers: Optional[Dict[str, str]]
# aiohttp complains if you pass it a large byte object
data_to_use = io.BytesIO(data_to_use.getbuffer())
data_to_use.seek(0)
response = await self.drive.request(method, url, headers=headers_to_use, json=json, data=data_to_use)
if is_json:
ret = await response.json()
await response.release()
return ret
else:
return response
return await self.drive.request(method, url, headers=headers_to_use, json=json, data=data_to_use)
except GoogleCredentialsExpired:
# Get fresh credentials, then retry right away.
logger.debug("Google Drive credentials have expired. We'll retry with new ones.")
Expand Down
Loading

0 comments on commit dc294c7

Please sign in to comment.