Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Google Ads: Implement multiple customer ids for google ads #10150

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.25
LABEL io.airbyte.version=0.1.26
LABEL io.airbyte.name=airbyte/source-google-ads
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,16 @@ tests:
- config_path: "secrets/config.json"
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_empty_streams.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams:
[
"geographic_report",
"keyword_report",
"display_keyword_performance_report",
"display_topics_performance_report",
"shopping_performance_report",
]
timeout_seconds: 600
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_protobuf_msg.json"
expect_records:
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"client_secret": "client_secret",
"refresh_token": "refresh_token"
},
"customer_id": "customer_id",
"customer_id": "4312523412",
"start_date": "2021-06-01",
"conversion_window_days": 14
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ def test_incremental_sync(config):
for record in records:
if record and record.type == Type.STATE:
print(record)
current_state = record.state.data["ad_group_ad_report"]["segments.date"]
temp_state = record.state.data["ad_group_ad_report"]
current_state = (
temp_state[config["customer_id"]]["segments.date"] if temp_state.get(config["customer_id"]) else temp_state["segments.date"]
)
if record and record.type == Type.RECORD:
assert record.record.data["segments.date"] >= current_state

Expand All @@ -71,7 +74,7 @@ def test_incremental_sync(config):

for record in records:
if record and record.type == Type.STATE:
current_state = record.state.data["ad_group_ad_report"]["segments.date"]
current_state = record.state.data["ad_group_ad_report"][config["customer_id"]]["segments.date"]
if record and record.type == Type.RECORD:
assert record.record.data["segments.date"] >= current_state

Expand All @@ -80,7 +83,6 @@ def test_incremental_sync(config):
records = google_ads_client.read(
AirbyteLogger(), config, ConfiguredAirbyteCatalog.parse_obj(SAMPLE_CATALOG), {"ad_group_ad_report": {"segments.date": state}}
)
current_state = pendulum.parse(state).subtract(days=14).to_date_string()

no_records = True
for record in records:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


from enum import Enum
from typing import Any, List, Mapping
from typing import Any, Iterator, List, Mapping

import pendulum
from google.ads.googleads.client import GoogleAdsClient
Expand Down Expand Up @@ -32,18 +32,21 @@ class GoogleAds:
DEFAULT_PAGE_SIZE = 1000

def __init__(self, credentials: Mapping[str, Any], customer_id: str):
# `google-ads` library version `14.0.0` and higher requires an additional required parameter `use_proto_plus`.
# More details can be found here: https://developers.google.com/google-ads/api/docs/client-libs/python/protobuf-messages
credentials["use_proto_plus"] = True

self.client = GoogleAdsClient.load_from_dict(credentials)
self.customer_id = customer_id
self.customer_ids = customer_id.split(",")
self.ga_service = self.client.get_service("GoogleAdsService")

def send_request(self, query: str) -> SearchGoogleAdsResponse:
def send_request(self, query: str, customer_id: str) -> Iterator[SearchGoogleAdsResponse]:
client = self.client
search_request = client.get_type("SearchGoogleAdsRequest")
search_request.customer_id = self.customer_id
search_request.query = query
search_request.page_size = self.DEFAULT_PAGE_SIZE

return self.ga_service.search(search_request)
search_request.customer_id = customer_id
yield self.ga_service.search(search_request)

def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, List, Mapping, Tuple, Union

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from google.ads.googleads.errors import GoogleAdsException
Expand Down Expand Up @@ -46,7 +47,9 @@ def get_credentials(config: Mapping[str, Any]) -> Mapping[str, Any]:

@staticmethod
def get_account_info(google_api) -> dict:
return next(Accounts(api=google_api).read_records(sync_mode=None), {})
accounts_streams = Accounts(api=google_api)
for stream_slice in accounts_streams.stream_slices(sync_mode=SyncMode.full_refresh):
return next(accounts_streams.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice), {})

@staticmethod
def get_time_zone(account: dict) -> Union[timezone, str]:
Expand Down Expand Up @@ -84,7 +87,8 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
raise Exception(f"Custom query should not contain {CustomQuery.cursor_field}")

req_q = CustomQuery.insert_segments_date_expr(query, "1980-01-01", "1980-01-01")
google_api.send_request(req_q)
for customer_id in google_api.customer_ids:
google_api.send_request(req_q, customer_id=customer_id)
return True, None
except GoogleAdsException as error:
return False, f"Unable to connect to Google Ads API with the provided credentials - {repr(error.failure)}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@
}
},
"customer_id": {
"title": "Customer ID",
"title": "Customer ID(s)",
"type": "string",
"description": "Customer ID must be specified as a 10-digit number without dashes. More instruction on how to find this value in our <a href=\"https://docs.airbyte.com/integrations/sources/google-ads#setup-guide\">docs</a>. Metrics streams like AdGroupAdReport cannot be requested for a manager account.",
"description": "Comma separated list of (client) customer IDs. Each customer ID must be specified as a 10-digit number without dashes. More instruction on how to find this value in our <a href=\"https://docs.airbyte.com/integrations/sources/google-ads#setup-guide\">docs</a>. Metrics streams like AdGroupAdReport cannot be requested for a manager account.",
"pattern": "^[0-9]{10}(,[0-9]{10})*$",
"examples": ["6783948572,5839201945"],
"order": 1
},
"start_date": {
Expand Down Expand Up @@ -98,6 +100,8 @@
"type": "string",
"title": "Login Customer ID for Managed Accounts (Optional)",
"description": "If your access to the customer account is through a manager account, this field is required and must be set to the customer ID of the manager account (10-digit number without dashes). More information about this field you can see <a href=\"https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid\">here</a>",
"pattern": "^([0-9]{10})?$",
"examples": ["7349206847"],
"order": 4
},
"conversion_window_days": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def chunk_date_range(
class GoogleAdsStream(Stream, ABC):
def __init__(self, api: GoogleAds):
self.google_ads_client = api
self._customer_id = None

def get_query(self, stream_slice: Mapping[str, Any]) -> str:
query = GoogleAds.convert_schema_into_query(schema=self.get_json_schema(), report_name=self.name)
Expand All @@ -93,9 +94,15 @@ def parse_response(self, response: SearchPager) -> Iterable[Mapping]:
for result in response:
yield self.google_ads_client.parse_single_result(self.get_json_schema(), result)

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
for customer_id in self.google_ads_client.customer_ids:
self._customer_id = customer_id
yield {}

def read_records(self, sync_mode, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]:
response = self.google_ads_client.send_request(self.get_query(stream_slice))
yield from self.parse_response(response)
account_responses = self.google_ads_client.send_request(self.get_query(stream_slice), customer_id=self._customer_id)
Copy link
Contributor

@keu keu Feb 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to your logic customer_id will be None is this acceptable by send_request?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the code a bit, overridden the stream_slices method for the class responsible for Full Refresh. Unfortunately, the solution is not very beautiful, but it allows you to avoid redefining the _read_incremental method for the SourceGoogleAds class, and at the same time be able to get the current customer_id from any class method (in particular, for the get_updated_state method).

for response in account_responses:
yield from self.parse_response(response)


class IncrementalGoogleAdsStream(GoogleAdsStream, ABC):
Expand All @@ -112,19 +119,30 @@ def __init__(self, start_date: str, conversion_window_days: int, time_zone: [pen
super().__init__(**kwargs)

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
stream_state = stream_state or {}
start_date = stream_state.get(self.cursor_field) or self._start_date
end_date = self._end_date

return chunk_date_range(
start_date=start_date,
end_date=end_date,
conversion_window=self.conversion_window_days,
field=self.cursor_field,
days_of_data_storage=self.days_of_data_storage,
range_days=self.range_days,
time_zone=self.time_zone,
)
for customer_id in self.google_ads_client.customer_ids:
self._customer_id = customer_id
stream_state = stream_state or {}
if stream_state.get(customer_id):
start_date = stream_state[customer_id].get(self.cursor_field) or self._start_date

# We should keep backward compatibility with the previous version
elif stream_state.get(self.cursor_field) and len(self.google_ads_client.customer_ids) == 1:
start_date = stream_state.get(self.cursor_field) or self._start_date
else:
start_date = self._start_date

end_date = self._end_date

for chunk in chunk_date_range(
start_date=start_date,
end_date=end_date,
conversion_window=self.conversion_window_days,
field=self.cursor_field,
days_of_data_storage=self.days_of_data_storage,
range_days=self.range_days,
time_zone=self.time_zone,
):
yield chunk

def read_records(
self,
Expand Down Expand Up @@ -152,13 +170,13 @@ def read_records(
# If range days is 1, no need in retry, because it's the minimum date range
self.logger.error("Page token has expired.")
raise exception
elif state.get(self.cursor_field) == stream_slice["start_date"]:
elif state.get(self._customer_id, {}).get(self.cursor_field) == stream_slice["start_date"]:
# It couldn't read all the records within one day, it will enter an infinite loop,
# so raise the error
self.logger.error("Page token has expired.")
raise exception
# Retry reading records from where it crushed
stream_slice["start_date"] = state.get(self.cursor_field, stream_slice["start_date"])
stream_slice["start_date"] = state.get(self._customer_id, {}).get(self.cursor_field, stream_slice["start_date"])
else:
# raise caught error for other error statuses
raise exception
Expand All @@ -169,16 +187,20 @@ def read_records(
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
current_stream_state = current_stream_state or {}

# When state is none return date from latest record
if current_stream_state.get(self.cursor_field) is None:
current_stream_state[self.cursor_field] = latest_record[self.cursor_field]

if current_stream_state.get(self.cursor_field):
stream_state = current_stream_state.pop(self.cursor_field)
elif current_stream_state.get(self._customer_id) and current_stream_state[self._customer_id].get(self.cursor_field):
stream_state = current_stream_state[self._customer_id][self.cursor_field]
else:
current_stream_state.update({self._customer_id: {self.cursor_field: latest_record[self.cursor_field]}})
return current_stream_state

date_in_current_stream = pendulum.parse(current_stream_state.get(self.cursor_field))
date_in_current_stream = pendulum.parse(stream_state)
date_in_latest_record = pendulum.parse(latest_record[self.cursor_field])

current_stream_state[self.cursor_field] = (max(date_in_current_stream, date_in_latest_record)).to_date_string()
current_stream_state.update(
{self._customer_id: {self.cursor_field: (max(date_in_current_stream, date_in_latest_record)).to_date_string()}}
)

return current_stream_state

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ def __getattr__(self, attr):
"client_id": "client_id",
"client_secret": "client_secret",
"refresh_token": "refresh_token",
"use_proto_plus": True,
}


def test_google_ads_init(mocker):
google_client_mocker = mocker.patch("source_google_ads.google_ads.GoogleAdsClient", return_value=MockGoogleAdsClient)
google_ads_client = GoogleAds(**SAMPLE_CONFIG)
assert google_ads_client.customer_id == SAMPLE_CONFIG["customer_id"]
assert google_ads_client.customer_ids == SAMPLE_CONFIG["customer_id"].split(",")
assert google_client_mocker.load_from_dict.call_args[0][0] == EXPECTED_CRED


Expand All @@ -62,11 +63,12 @@ def test_send_request(mocker):
google_ads_client = GoogleAds(**SAMPLE_CONFIG)
query = "Query"
page_size = 1000
response = google_ads_client.send_request(query)
customer_id = SAMPLE_CONFIG["customer_id"].split(",")[0]
response = list(google_ads_client.send_request(query, customer_id=customer_id))

assert response.customer_id == SAMPLE_CONFIG["customer_id"]
assert response.query == query
assert response.page_size == page_size
assert response[0].customer_id == customer_id
assert response[0].query == query
assert response[0].page_size == page_size


def test_get_fields_from_schema():
Expand Down
Loading