Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Youtube analytics - added custom backoff logic #17454

Merged
merged 9 commits into from
Oct 7, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,82 @@
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer


class JobsResource(HttpStream):
class CustomBackoffMixin:

def daily_quota_exceeded(self, response: requests.Response) -> bool:
"""Response example:
{
"error": {
"code": 429,
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'.",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"reason": "RATE_LIMIT_EXCEEDED",
"metadata": {
"consumer": "projects/863188056127",
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",
"quota_metric": "youtubereporting.googleapis.com/free_quota_requests",
"service": "youtubereporting.googleapis.com",
}
},
]
}
}

:param response:
:return:
"""
details = response.json().get('error', {}).get('details', [])
for detail in details:
if detail.get('reason') == 'RATE_LIMIT_EXCEEDED':
if detail.get('metadata', {}).get('quota_limit') == "FreeQuotaRequestsPerDayPerProject":
self.logger.error(f"Exceeded daily quota: {detail.get('metadata', {}).get('quota_limit_value')} reqs/day")
return True
break
return False

def should_retry(self, response: requests.Response) -> bool:
"""
Override to set different conditions for backoff based on the response from the server.

By default, back off on the following HTTP response statuses:
- 500s to handle transient server errors
- 429 (Too Many Requests) indicating rate limiting:
Different behavior in case of 'RATE_LIMIT_EXCEEDED':

Requests Per Minute:
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'."
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",

--> use increased retry_factor (30 seconds)

Requests Per Day:
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per day' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127"
"quota_limit": "FreeQuotaRequestsPerDayPerProject
"quota_limit_value": "20000",

--> just throw an error, next scan is reasonable to start only in 1 day.
"""
if 500 <= response.status_code < 600:
return True

if response.status_code == 429 and not self.daily_quota_exceeded(response):
return True

return False

@property
def retry_factor(self) -> float:
"""
Default FreeQuotaRequestsPerMinutePerProject is 60 reqs/min, so reasonable delay is 30 seconds
"""
return 30


class JobsResource(CustomBackoffMixin, HttpStream):
"""
https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs

Expand Down Expand Up @@ -79,7 +154,7 @@ def create(self, name):
return result["id"]


class ReportResources(HttpStream):
class ReportResources(CustomBackoffMixin, HttpStream):
"https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs.reports/list"

name = None
Expand Down Expand Up @@ -119,7 +194,7 @@ def path(
return "jobs/{}/reports".format(self.job_id)


class ChannelReports(HttpSubStream):
class ChannelReports(CustomBackoffMixin, HttpSubStream):
"https://developers.google.com/youtube/reporting/v1/reports/channel_reports"

name = None
Expand Down
11 changes: 6 additions & 5 deletions docs/integrations/sources/youtube-analytics.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ Quota usage is not an issue because data is retrieved once and then filtered, so

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` |
| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type |
| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release |
| Version | Date | Pull Request | Subject |
|:----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------|
| 0.1.3 | 2022-09-30 | [17454](https://github.com/airbytehq/airbyte/pull/17454) | Added custom backoff logic |
| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` |
| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type |
| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release |