Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source google ads: retry transient errors #20855

Merged
merged 5 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ COPY main.py ./

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.5
LABEL io.airbyte.version=0.2.6
LABEL io.airbyte.name=airbyte/source-google-ads
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
#


import logging
from enum import Enum
from typing import Any, Iterator, List, Mapping, MutableMapping

import backoff
import pendulum
from google.ads.googleads.client import GoogleAdsClient
from google.ads.googleads.v11.services.types.google_ads_service import GoogleAdsRow, SearchGoogleAdsResponse
from google.api_core.exceptions import ServerError, TooManyRequests
from proto.marshal.collections import Repeated, RepeatedComposite

REPORT_MAPPING = {
Expand All @@ -31,6 +34,7 @@
"keyword_report": "keyword_view",
}
API_VERSION = "v11"
logger = logging.getLogger("airbyte")


class GoogleAds:
Expand All @@ -43,13 +47,21 @@ def __init__(self, credentials: MutableMapping[str, Any]):
self.client = GoogleAdsClient.load_from_dict(credentials, version=API_VERSION)
self.ga_service = self.client.get_service("GoogleAdsService")

@backoff.on_exception(
backoff.expo,
(ServerError, TooManyRequests),
on_backoff=lambda details: logger.info(
f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} seconds then retrying..."
),
max_tries=5,
)
def send_request(self, query: str, customer_id: str) -> Iterator[SearchGoogleAdsResponse]:
client = self.client
search_request = client.get_type("SearchGoogleAdsRequest")
search_request.query = query
search_request.page_size = self.DEFAULT_PAGE_SIZE
search_request.customer_id = customer_id
yield self.ga_service.search(search_request)
return [self.ga_service.search(search_request)]

def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from google.ads.googleads.errors import GoogleAdsException
from google.ads.googleads.v11.errors.types.errors import ErrorCode, GoogleAdsError, GoogleAdsFailure
from google.ads.googleads.v11.errors.types.request_error import RequestErrorEnum
from google.api_core.exceptions import DataLoss, InternalServerError, ResourceExhausted, TooManyRequests
from grpc import RpcError
from source_google_ads.google_ads import GoogleAds
from source_google_ads.streams import ClickView
Expand Down Expand Up @@ -193,3 +194,27 @@ def test_page_token_expired_it_should_fail_date_range_1_day(mock_ads_client, con

stream.get_query.assert_called_with({"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"})
assert stream.get_query.call_count == 1


@pytest.mark.parametrize("error_cls", (ResourceExhausted, TooManyRequests, InternalServerError, DataLoss))
def test_retry_transient_errors(mocker, config, customers, error_cls):
mocker.patch("time.sleep")
credentials = config["credentials"]
credentials.update(use_proto_plus=True)
api = GoogleAds(credentials=credentials)
mocked_search = mocker.patch.object(api.ga_service, "search", side_effect=error_cls("Error message"))
incremental_stream_config = dict(
api=api,
conversion_window_days=config["conversion_window_days"],
start_date=config["start_date"],
end_date="2021-04-04",
customers=customers,
)
stream = ClickView(**incremental_stream_config)
customer_id = next(iter(customers)).id
stream_slice = {"customer_id": customer_id, "start_date": "2021-01-03", "end_date": "2021-01-04"}
records = []
with pytest.raises(error_cls):
records = list(stream.read_records(sync_mode=SyncMode.incremental, cursor_field=["segments.date"], stream_slice=stream_slice))
assert mocked_search.call_count == 5
assert records == []
1 change: 1 addition & 0 deletions docs/integrations/sources/google-ads.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Due to a limitation in the Google Ads API which does not allow getting performan

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------|
| `0.2.6` | 2022-12-22 | [20855](https://github.com/airbytehq/airbyte/pull/20855) | Retry 429 and 5xx errors |
| `0.2.5` | 2022-11-22 | [19700](https://github.com/airbytehq/airbyte/pull/19700) | Fix schema for `campaigns` stream |
| `0.2.4` | 2022-11-09 | [19208](https://github.com/airbytehq/airbyte/pull/19208) | Add TypeTransofrmer to Campaings stream to force proper type casting |
| `0.2.3` | 2022-10-17 | [18069](https://github.com/airbytehq/airbyte/pull/18069) | Add `segments.hour`, `metrics.ctr`, `metrics.conversions` and `metrics.conversions_values` fields to `campaigns` report stream |
Expand Down