Skip to content

Commit

Permalink
🎉 Source Youtube analytics - added custom backoff logic (airbytehq#17454
Browse files Browse the repository at this point in the history
)

* added custom backoff logic based on youtube analytics limits

* updated docs

* updated docs

* bumped connector version

* added configuration option for test purpose only

* added comments

* added unittest

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and jhammarstedt committed Oct 31, 2022
1 parent b8562ca commit 10b16db
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@
- sourceDefinitionId: afa734e4-3571-11ec-991a-1e0031268139
name: YouTube Analytics
dockerRepository: airbyte/source-youtube-analytics
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/youtube-analytics
icon: youtube.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11539,7 +11539,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-youtube-analytics:0.1.2"
- dockerImage: "airbyte/source-youtube-analytics:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/youtube-analytics"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_youtube_analytics ./source_youtube_analytics
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-youtube-analytics
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tests:
- channel_demographics_a1
- channel_end_screens_a1
- channel_sharing_service_a1
- channel_province_a2
- playlist_basic_a1
- playlist_combined_a1
- playlist_device_os_a1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pkgutil
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand All @@ -18,7 +19,81 @@
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer


class JobsResource(HttpStream):
class CustomBackoffMixin:
def daily_quota_exceeded(self, response: requests.Response) -> bool:
"""Response example:
{
"error": {
"code": 429,
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'.",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"reason": "RATE_LIMIT_EXCEEDED",
"metadata": {
"consumer": "projects/863188056127",
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",
"quota_metric": "youtubereporting.googleapis.com/free_quota_requests",
"service": "youtubereporting.googleapis.com",
}
},
]
}
}
:param response:
:return:
"""
details = response.json().get("error", {}).get("details", [])
for detail in details:
if detail.get("reason") == "RATE_LIMIT_EXCEEDED":
if detail.get("metadata", {}).get("quota_limit") == "FreeQuotaRequestsPerDayPerProject":
self.logger.error(f"Exceeded daily quota: {detail.get('metadata', {}).get('quota_limit_value')} reqs/day")
return True
break
return False

def should_retry(self, response: requests.Response) -> bool:
"""
Override to set different conditions for backoff based on the response from the server.
By default, back off on the following HTTP response statuses:
- 500s to handle transient server errors
- 429 (Too Many Requests) indicating rate limiting:
Different behavior in case of 'RATE_LIMIT_EXCEEDED':
Requests Per Minute:
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'."
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",
--> use increased retry_factor (30 seconds)
Requests Per Day:
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per day' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127"
"quota_limit": "FreeQuotaRequestsPerDayPerProject
"quota_limit_value": "20000",
--> just throw an error, next scan is reasonable to start only in 1 day.
"""
if 500 <= response.status_code < 600:
return True

if response.status_code == 429 and not self.daily_quota_exceeded(response):
return True

return False

@property
def retry_factor(self) -> float:
"""
Default FreeQuotaRequestsPerMinutePerProject is 60 reqs/min, so reasonable delay is 30 seconds
"""
return 30


class JobsResource(CustomBackoffMixin, HttpStream):
"""
https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs
Expand Down Expand Up @@ -79,19 +154,36 @@ def create(self, name):
return result["id"]


class ReportResources(HttpStream):
class ReportResources(CustomBackoffMixin, HttpStream):
"https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs.reports/list"

name = None
primary_key = "id"
url_base = "https://youtubereporting.googleapis.com/v1/"

def __init__(self, name: str, jobs_resource: JobsResource, job_id: str, **kwargs):
def __init__(self, name: str, jobs_resource: JobsResource, job_id: str, start_time: str = None, **kwargs):
self.name = name
self.jobs_resource = jobs_resource
self.job_id = job_id
self.start_time = start_time
super().__init__(**kwargs)

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
if not self.job_id:
self.job_id = self.jobs_resource.create(self.name)
self.logger.info(f"YouTube reporting job is created: '{self.job_id}'")
return "jobs/{}/reports".format(self.job_id)

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
return {"startTimeAtOrAfter": self.start_time} if self.start_time else {}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None

Expand All @@ -105,21 +197,13 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
reports.sort(key=lambda x: x["startTime"])
date = kwargs["stream_state"].get("date")
if date:
reports = [r for r in reports if int(r["startTime"].date().strftime("%Y%m%d")) >= date]
reports = [r for r in reports if int(r["startTime"].date().strftime("%Y%m%d")) > date]
if not reports:
reports.append(None)
return reports

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
if not self.job_id:
self.job_id = self.jobs_resource.create(self.name)
self.logger.info(f"YouTube reporting job is created: '{self.job_id}'")
return "jobs/{}/reports".format(self.job_id)


class ChannelReports(HttpSubStream):
class ChannelReports(CustomBackoffMixin, HttpSubStream):
"https://developers.google.com/youtube/reporting/v1/reports/channel_reports"

name = None
Expand Down Expand Up @@ -194,13 +278,25 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
jobs = jobs_resource.list()
report_to_job_id = {j["reportTypeId"]: j["id"] for j in jobs}

# By default, API returns reports for last 60 days. Report for each day requires a separate request.
# Full scan of all 18 streams requires ~ 1100 requests (18+18*60), so we can hit 'default' API quota limits:
# - 60 reqs per minute
# - 20000 reqs per day
# For SAT: scan only last N days ('testing_period' option) in order to decrease a number of requests and avoid API limits
start_time = None
testing_period = config.get("testing_period")
if testing_period:
start_time = pendulum.today().add(days=-int(testing_period)).to_rfc3339_string()

channel_reports = json.loads(pkgutil.get_data("source_youtube_analytics", "defaults/channel_reports.json"))

streams = []
for channel_report in channel_reports:
stream_name = channel_report["id"]
dimensions = channel_report["dimensions"]
job_id = report_to_job_id.get(stream_name)
parent = ReportResources(name=stream_name, jobs_resource=jobs_resource, job_id=job_id, authenticator=authenticator)
parent = ReportResources(
name=stream_name, jobs_resource=jobs_resource, job_id=job_id, start_time=start_time, authenticator=authenticator
)
streams.append(ChannelReports(name=stream_name, dimensions=dimensions, parent=parent, authenticator=authenticator))
return streams
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections import OrderedDict
from unittest.mock import MagicMock

from source_youtube_analytics.source import ChannelReports, JobsResource, ReportResources
from source_youtube_analytics.source import ChannelReports, CustomBackoffMixin, JobsResource, ReportResources


def test_jobs_resource_list(requests_mock):
Expand Down Expand Up @@ -159,3 +159,71 @@ def test_channel_reports_parse_response():
OrderedDict([("date", "20211026"), ("channel_id", "UCybpwL6sPt6SSzazIV400WQ"), ("likes", "210"), ("dislikes", "21")]),
OrderedDict([("date", "20211026"), ("channel_id", "UCybpwL6sPt6SSzazIV400WQ"), ("likes", "150"), ("dislikes", "18")]),
]


def test_backoff_505():
response = MagicMock()
response.status_code = 505
assert CustomBackoffMixin().should_retry(response) is True


def test_backoff_429():
response = MagicMock()
response.status_code = 429
assert CustomBackoffMixin().should_retry(response) is True


def test_backoff_429_per_minute_limit():
response = MagicMock()
response.status_code = 429
response.json = MagicMock(
return_value={
"error": {
"code": 429,
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'.",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"reason": "RATE_LIMIT_EXCEEDED",
"metadata": {
"consumer": "projects/863188056127",
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",
"quota_metric": "youtubereporting.googleapis.com/free_quota_requests",
"service": "youtubereporting.googleapis.com",
},
}
],
}
}
)
assert CustomBackoffMixin().should_retry(response) is True


def test_backoff_429_per_day_limit():
response = MagicMock()
response.status_code = 429
response.json = MagicMock(
return_value={
"error": {
"code": 429,
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per day' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"reason": "RATE_LIMIT_EXCEEDED",
"metadata": {
"consumer": "projects/863188056127",
"quota_limit": "FreeQuotaRequestsPerDayPerProject",
"quota_limit_value": "20000",
"quota_metric": "youtubereporting.googleapis.com/free_quota_requests",
"service": "youtubereporting.googleapis.com",
},
}
],
}
}
)
custom_mixin = CustomBackoffMixin()
custom_mixin.logger = MagicMock()
assert custom_mixin.should_retry(response) is False
11 changes: 6 additions & 5 deletions docs/integrations/sources/youtube-analytics.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ Quota usage is not an issue because data is retrieved once and then filtered, so

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` |
| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type |
| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release |
| Version | Date | Pull Request | Subject |
|:----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------|
| 0.1.3 | 2022-09-30 | [17454](https://github.com/airbytehq/airbyte/pull/17454) | Added custom backoff logic |
| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` |
| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type |
| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release |

0 comments on commit 10b16db

Please sign in to comment.