Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Youtube analytics - added custom backoff logic #17454

Merged
merged 9 commits into from
Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1107,7 +1107,7 @@
- sourceDefinitionId: afa734e4-3571-11ec-991a-1e0031268139
name: YouTube Analytics
dockerRepository: airbyte/source-youtube-analytics
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/youtube-analytics
icon: youtube.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11525,7 +11525,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-youtube-analytics:0.1.2"
- dockerImage: "airbyte/source-youtube-analytics:0.1.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/youtube-analytics"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_youtube_analytics ./source_youtube_analytics
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-youtube-analytics
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tests:
- channel_demographics_a1
- channel_end_screens_a1
- channel_sharing_service_a1
- channel_province_a2
- playlist_basic_a1
- playlist_combined_a1
- playlist_device_os_a1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import pkgutil
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple

import pendulum
import requests
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand All @@ -18,7 +19,81 @@
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer


class JobsResource(HttpStream):
class CustomBackoffMixin:
def daily_quota_exceeded(self, response: requests.Response) -> bool:
"""Response example:
{
"error": {
"code": 429,
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'.",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"reason": "RATE_LIMIT_EXCEEDED",
"metadata": {
"consumer": "projects/863188056127",
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",
"quota_metric": "youtubereporting.googleapis.com/free_quota_requests",
"service": "youtubereporting.googleapis.com",
}
},
]
}
}

:param response:
:return:
"""
details = response.json().get("error", {}).get("details", [])
for detail in details:
if detail.get("reason") == "RATE_LIMIT_EXCEEDED":
if detail.get("metadata", {}).get("quota_limit") == "FreeQuotaRequestsPerDayPerProject":
self.logger.error(f"Exceeded daily quota: {detail.get('metadata', {}).get('quota_limit_value')} reqs/day")
return True
break
return False

def should_retry(self, response: requests.Response) -> bool:
"""
Override to set different conditions for backoff based on the response from the server.

By default, back off on the following HTTP response statuses:
- 500s to handle transient server errors
- 429 (Too Many Requests) indicating rate limiting:
Different behavior in case of 'RATE_LIMIT_EXCEEDED':

Requests Per Minute:
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'."
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",

--> use increased retry_factor (30 seconds)

Requests Per Day:
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per day' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127"
"quota_limit": "FreeQuotaRequestsPerDayPerProject
"quota_limit_value": "20000",

--> just throw an error, next scan is reasonable to start only in 1 day.
"""
if 500 <= response.status_code < 600:
return True

if response.status_code == 429 and not self.daily_quota_exceeded(response):
return True

return False

@property
def retry_factor(self) -> float:
"""
Default FreeQuotaRequestsPerMinutePerProject is 60 reqs/min, so reasonable delay is 30 seconds
"""
return 30


class JobsResource(CustomBackoffMixin, HttpStream):
"""
https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs

Expand Down Expand Up @@ -79,19 +154,36 @@ def create(self, name):
return result["id"]


class ReportResources(HttpStream):
class ReportResources(CustomBackoffMixin, HttpStream):
"https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs.reports/list"

name = None
primary_key = "id"
url_base = "https://youtubereporting.googleapis.com/v1/"

def __init__(self, name: str, jobs_resource: JobsResource, job_id: str, **kwargs):
def __init__(self, name: str, jobs_resource: JobsResource, job_id: str, start_time: str = None, **kwargs):
self.name = name
self.jobs_resource = jobs_resource
self.job_id = job_id
self.start_time = start_time
super().__init__(**kwargs)

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
if not self.job_id:
self.job_id = self.jobs_resource.create(self.name)
self.logger.info(f"YouTube reporting job is created: '{self.job_id}'")
return "jobs/{}/reports".format(self.job_id)

def request_params(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
return {"startTimeAtOrAfter": self.start_time} if self.start_time else {}

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
return None

Expand All @@ -105,21 +197,13 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
reports.sort(key=lambda x: x["startTime"])
date = kwargs["stream_state"].get("date")
if date:
reports = [r for r in reports if int(r["startTime"].date().strftime("%Y%m%d")) >= date]
reports = [r for r in reports if int(r["startTime"].date().strftime("%Y%m%d")) > date]
if not reports:
reports.append(None)
return reports

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
if not self.job_id:
self.job_id = self.jobs_resource.create(self.name)
self.logger.info(f"YouTube reporting job is created: '{self.job_id}'")
return "jobs/{}/reports".format(self.job_id)


class ChannelReports(HttpSubStream):
class ChannelReports(CustomBackoffMixin, HttpSubStream):
"https://developers.google.com/youtube/reporting/v1/reports/channel_reports"

name = None
Expand Down Expand Up @@ -194,13 +278,25 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
jobs = jobs_resource.list()
report_to_job_id = {j["reportTypeId"]: j["id"] for j in jobs}

# By default, API returns reports for last 60 days. Report for each day requires a separate request.
# Full scan of all 18 streams requires ~ 1100 requests (18+18*60), so we can hit 'default' API quota limits:
# - 60 reqs per minute
# - 20000 reqs per day
# For SAT: scan only last N days ('testing_period' option) in order to decrease a number of requests and avoid API limits
start_time = None
testing_period = config.get("testing_period")
if testing_period:
start_time = pendulum.today().add(days=-int(testing_period)).to_rfc3339_string()

channel_reports = json.loads(pkgutil.get_data("source_youtube_analytics", "defaults/channel_reports.json"))

streams = []
for channel_report in channel_reports:
stream_name = channel_report["id"]
dimensions = channel_report["dimensions"]
job_id = report_to_job_id.get(stream_name)
parent = ReportResources(name=stream_name, jobs_resource=jobs_resource, job_id=job_id, authenticator=authenticator)
parent = ReportResources(
name=stream_name, jobs_resource=jobs_resource, job_id=job_id, start_time=start_time, authenticator=authenticator
)
streams.append(ChannelReports(name=stream_name, dimensions=dimensions, parent=parent, authenticator=authenticator))
return streams
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from collections import OrderedDict
from unittest.mock import MagicMock

from source_youtube_analytics.source import ChannelReports, JobsResource, ReportResources
from source_youtube_analytics.source import ChannelReports, CustomBackoffMixin, JobsResource, ReportResources


def test_jobs_resource_list(requests_mock):
Expand Down Expand Up @@ -159,3 +159,71 @@ def test_channel_reports_parse_response():
OrderedDict([("date", "20211026"), ("channel_id", "UCybpwL6sPt6SSzazIV400WQ"), ("likes", "210"), ("dislikes", "21")]),
OrderedDict([("date", "20211026"), ("channel_id", "UCybpwL6sPt6SSzazIV400WQ"), ("likes", "150"), ("dislikes", "18")]),
]


def test_backoff_505():
response = MagicMock()
response.status_code = 505
assert CustomBackoffMixin().should_retry(response) is True


def test_backoff_429():
response = MagicMock()
response.status_code = 429
assert CustomBackoffMixin().should_retry(response) is True


def test_backoff_429_per_minute_limit():
response = MagicMock()
response.status_code = 429
response.json = MagicMock(
return_value={
"error": {
"code": 429,
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'.",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"reason": "RATE_LIMIT_EXCEEDED",
"metadata": {
"consumer": "projects/863188056127",
"quota_limit": "FreeQuotaRequestsPerMinutePerProject",
"quota_limit_value": "60",
"quota_metric": "youtubereporting.googleapis.com/free_quota_requests",
"service": "youtubereporting.googleapis.com",
},
}
],
}
}
)
assert CustomBackoffMixin().should_retry(response) is True


def test_backoff_429_per_day_limit():
response = MagicMock()
response.status_code = 429
response.json = MagicMock(
return_value={
"error": {
"code": 429,
"message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per day' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127",
"status": "RESOURCE_EXHAUSTED",
"details": [
{
"reason": "RATE_LIMIT_EXCEEDED",
"metadata": {
"consumer": "projects/863188056127",
"quota_limit": "FreeQuotaRequestsPerDayPerProject",
"quota_limit_value": "20000",
"quota_metric": "youtubereporting.googleapis.com/free_quota_requests",
"service": "youtubereporting.googleapis.com",
},
}
],
}
}
)
custom_mixin = CustomBackoffMixin()
custom_mixin.logger = MagicMock()
assert custom_mixin.should_retry(response) is False
11 changes: 6 additions & 5 deletions docs/integrations/sources/youtube-analytics.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ Quota usage is not an issue because data is retrieved once and then filtered, so

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` |
| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type |
| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release |
| Version | Date | Pull Request | Subject |
|:----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------|
| 0.1.3 | 2022-09-30 | [17454](https://github.com/airbytehq/airbyte/pull/17454) | Added custom backoff logic |
| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` |
| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type |
| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release |