diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 45114bfa9e87..f65ec99cf1c6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1107,7 +1107,7 @@ - sourceDefinitionId: afa734e4-3571-11ec-991a-1e0031268139 name: YouTube Analytics dockerRepository: airbyte/source-youtube-analytics - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/sources/youtube-analytics icon: youtube.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 227064bc1db3..6da781cc8a5f 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -11525,7 +11525,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-youtube-analytics:0.1.2" +- dockerImage: "airbyte/source-youtube-analytics:0.1.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/youtube-analytics" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-youtube-analytics/Dockerfile b/airbyte-integrations/connectors/source-youtube-analytics/Dockerfile index 5f59df26be35..572a4dfaa5cc 100644 --- a/airbyte-integrations/connectors/source-youtube-analytics/Dockerfile +++ b/airbyte-integrations/connectors/source-youtube-analytics/Dockerfile @@ -34,5 +34,5 @@ COPY source_youtube_analytics ./source_youtube_analytics ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-youtube-analytics diff --git a/airbyte-integrations/connectors/source-youtube-analytics/acceptance-test-config.yml b/airbyte-integrations/connectors/source-youtube-analytics/acceptance-test-config.yml index 4510c487b7a6..8b46149cf64d 100644 --- a/airbyte-integrations/connectors/source-youtube-analytics/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-youtube-analytics/acceptance-test-config.yml @@ -23,6 +23,7 @@ tests: - channel_demographics_a1 - channel_end_screens_a1 - channel_sharing_service_a1 + - channel_province_a2 - playlist_basic_a1 - playlist_combined_a1 - playlist_device_os_a1 diff --git a/airbyte-integrations/connectors/source-youtube-analytics/source_youtube_analytics/source.py b/airbyte-integrations/connectors/source-youtube-analytics/source_youtube_analytics/source.py index 302bb747397a..e708407a07f3 100644 --- a/airbyte-integrations/connectors/source-youtube-analytics/source_youtube_analytics/source.py +++ b/airbyte-integrations/connectors/source-youtube-analytics/source_youtube_analytics/source.py @@ -10,6 +10,7 @@ import pkgutil from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +import pendulum import requests from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream @@ -18,7 +19,81 @@ from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -class JobsResource(HttpStream): +class CustomBackoffMixin: + def daily_quota_exceeded(self, response: requests.Response) -> bool: + """Response example: + { + "error": { + "code": 429, + "message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'.", + "status": "RESOURCE_EXHAUSTED", + "details": [ + { + "reason": "RATE_LIMIT_EXCEEDED", + "metadata": { + "consumer": "projects/863188056127", + "quota_limit": "FreeQuotaRequestsPerMinutePerProject", + "quota_limit_value": "60", + "quota_metric": "youtubereporting.googleapis.com/free_quota_requests", + "service": "youtubereporting.googleapis.com", + } + }, + ] + } + } + + :param response: + :return: + """ + details = response.json().get("error", {}).get("details", []) + for detail in details: + if detail.get("reason") == "RATE_LIMIT_EXCEEDED": + if detail.get("metadata", {}).get("quota_limit") == "FreeQuotaRequestsPerDayPerProject": + self.logger.error(f"Exceeded daily quota: {detail.get('metadata', {}).get('quota_limit_value')} reqs/day") + return True + break + return False + + def should_retry(self, response: requests.Response) -> bool: + """ + Override to set different conditions for backoff based on the response from the server. + + By default, back off on the following HTTP response statuses: + - 500s to handle transient server errors + - 429 (Too Many Requests) indicating rate limiting: + Different behavior in case of 'RATE_LIMIT_EXCEEDED': + + Requests Per Minute: + "message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'." + "quota_limit": "FreeQuotaRequestsPerMinutePerProject", + "quota_limit_value": "60", + + --> use increased retry_factor (30 seconds) + + Requests Per Day: + "message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per day' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127" + "quota_limit": "FreeQuotaRequestsPerDayPerProject + "quota_limit_value": "20000", + + --> just throw an error, next scan is reasonable to start only in 1 day. + """ + if 500 <= response.status_code < 600: + return True + + if response.status_code == 429 and not self.daily_quota_exceeded(response): + return True + + return False + + @property + def retry_factor(self) -> float: + """ + Default FreeQuotaRequestsPerMinutePerProject is 60 reqs/min, so reasonable delay is 30 seconds + """ + return 30 + + +class JobsResource(CustomBackoffMixin, HttpStream): """ https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs @@ -79,19 +154,36 @@ def create(self, name): return result["id"] -class ReportResources(HttpStream): +class ReportResources(CustomBackoffMixin, HttpStream): "https://developers.google.com/youtube/reporting/v1/reference/rest/v1/jobs.reports/list" name = None primary_key = "id" url_base = "https://youtubereporting.googleapis.com/v1/" - def __init__(self, name: str, jobs_resource: JobsResource, job_id: str, **kwargs): + def __init__(self, name: str, jobs_resource: JobsResource, job_id: str, start_time: str = None, **kwargs): self.name = name self.jobs_resource = jobs_resource self.job_id = job_id + self.start_time = start_time super().__init__(**kwargs) + def path( + self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None + ) -> str: + if not self.job_id: + self.job_id = self.jobs_resource.create(self.name) + self.logger.info(f"YouTube reporting job is created: '{self.job_id}'") + return "jobs/{}/reports".format(self.job_id) + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + return {"startTimeAtOrAfter": self.start_time} if self.start_time else {} + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: return None @@ -105,21 +197,13 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp reports.sort(key=lambda x: x["startTime"]) date = kwargs["stream_state"].get("date") if date: - reports = [r for r in reports if int(r["startTime"].date().strftime("%Y%m%d")) >= date] + reports = [r for r in reports if int(r["startTime"].date().strftime("%Y%m%d")) > date] if not reports: reports.append(None) return reports - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - if not self.job_id: - self.job_id = self.jobs_resource.create(self.name) - self.logger.info(f"YouTube reporting job is created: '{self.job_id}'") - return "jobs/{}/reports".format(self.job_id) - -class ChannelReports(HttpSubStream): +class ChannelReports(CustomBackoffMixin, HttpSubStream): "https://developers.google.com/youtube/reporting/v1/reports/channel_reports" name = None @@ -194,6 +278,16 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: jobs = jobs_resource.list() report_to_job_id = {j["reportTypeId"]: j["id"] for j in jobs} + # By default, API returns reports for last 60 days. Report for each day requires a separate request. + # Full scan of all 18 streams requires ~ 1100 requests (18+18*60), so we can hit 'default' API quota limits: + # - 60 reqs per minute + # - 20000 reqs per day + # For SAT: scan only last N days ('testing_period' option) in order to decrease a number of requests and avoid API limits + start_time = None + testing_period = config.get("testing_period") + if testing_period: + start_time = pendulum.today().add(days=-int(testing_period)).to_rfc3339_string() + channel_reports = json.loads(pkgutil.get_data("source_youtube_analytics", "defaults/channel_reports.json")) streams = [] @@ -201,6 +295,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: stream_name = channel_report["id"] dimensions = channel_report["dimensions"] job_id = report_to_job_id.get(stream_name) - parent = ReportResources(name=stream_name, jobs_resource=jobs_resource, job_id=job_id, authenticator=authenticator) + parent = ReportResources( + name=stream_name, jobs_resource=jobs_resource, job_id=job_id, start_time=start_time, authenticator=authenticator + ) streams.append(ChannelReports(name=stream_name, dimensions=dimensions, parent=parent, authenticator=authenticator)) return streams diff --git a/airbyte-integrations/connectors/source-youtube-analytics/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-youtube-analytics/unit_tests/test_streams.py index 4c00115549e2..70dde9c004ad 100644 --- a/airbyte-integrations/connectors/source-youtube-analytics/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-youtube-analytics/unit_tests/test_streams.py @@ -6,7 +6,7 @@ from collections import OrderedDict from unittest.mock import MagicMock -from source_youtube_analytics.source import ChannelReports, JobsResource, ReportResources +from source_youtube_analytics.source import ChannelReports, CustomBackoffMixin, JobsResource, ReportResources def test_jobs_resource_list(requests_mock): @@ -159,3 +159,71 @@ def test_channel_reports_parse_response(): OrderedDict([("date", "20211026"), ("channel_id", "UCybpwL6sPt6SSzazIV400WQ"), ("likes", "210"), ("dislikes", "21")]), OrderedDict([("date", "20211026"), ("channel_id", "UCybpwL6sPt6SSzazIV400WQ"), ("likes", "150"), ("dislikes", "18")]), ] + + +def test_backoff_505(): + response = MagicMock() + response.status_code = 505 + assert CustomBackoffMixin().should_retry(response) is True + + +def test_backoff_429(): + response = MagicMock() + response.status_code = 429 + assert CustomBackoffMixin().should_retry(response) is True + + +def test_backoff_429_per_minute_limit(): + response = MagicMock() + response.status_code = 429 + response.json = MagicMock( + return_value={ + "error": { + "code": 429, + "message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per minute' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127'.", + "status": "RESOURCE_EXHAUSTED", + "details": [ + { + "reason": "RATE_LIMIT_EXCEEDED", + "metadata": { + "consumer": "projects/863188056127", + "quota_limit": "FreeQuotaRequestsPerMinutePerProject", + "quota_limit_value": "60", + "quota_metric": "youtubereporting.googleapis.com/free_quota_requests", + "service": "youtubereporting.googleapis.com", + }, + } + ], + } + } + ) + assert CustomBackoffMixin().should_retry(response) is True + + +def test_backoff_429_per_day_limit(): + response = MagicMock() + response.status_code = 429 + response.json = MagicMock( + return_value={ + "error": { + "code": 429, + "message": "Quota exceeded for quota metric 'Free requests' and limit 'Free requests per day' of service 'youtubereporting.googleapis.com' for consumer 'project_number:863188056127", + "status": "RESOURCE_EXHAUSTED", + "details": [ + { + "reason": "RATE_LIMIT_EXCEEDED", + "metadata": { + "consumer": "projects/863188056127", + "quota_limit": "FreeQuotaRequestsPerDayPerProject", + "quota_limit_value": "20000", + "quota_metric": "youtubereporting.googleapis.com/free_quota_requests", + "service": "youtubereporting.googleapis.com", + }, + } + ], + } + } + ) + custom_mixin = CustomBackoffMixin() + custom_mixin.logger = MagicMock() + assert custom_mixin.should_retry(response) is False diff --git a/docs/integrations/sources/youtube-analytics.md b/docs/integrations/sources/youtube-analytics.md index 1fac232283f7..c797d564979a 100644 --- a/docs/integrations/sources/youtube-analytics.md +++ b/docs/integrations/sources/youtube-analytics.md @@ -80,8 +80,9 @@ Quota usage is not an issue because data is retrieved once and then filtered, so ## Changelog -| Version | Date | Pull Request | Subject | -| :--- | :--- | :--- | :--- | -| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` | -| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type | -| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release | +| Version | Date | Pull Request | Subject | +|:----------|:-----------|:---------------------------------------------------------|:-----------------------------------------------| +| 0.1.3 | 2022-09-30 | [17454](https://github.com/airbytehq/airbyte/pull/17454) | Added custom backoff logic | +| 0.1.2 | 2022-09-29 | [17399](https://github.com/airbytehq/airbyte/pull/17399) | Fixed `403` error while `check connection` | +| 0.1.1 | 2022-08-18 | [15744](https://github.com/airbytehq/airbyte/pull/15744) | Fix `channel_basic_a2` schema fields data type | +| 0.1.0 | 2021-11-01 | [7407](https://github.com/airbytehq/airbyte/pull/7407) | Initial Release |