Skip to content

Commit

Permalink
🎉 Google Ads improvement: Support user-specified queries (#5302)
Browse files Browse the repository at this point in the history
*Add google ads custom queries stream

*Display link to gradle scan on PR comment if test build failed
  • Loading branch information
vovavovavovavova authored Sep 10, 2021
1 parent 5f697ac commit aa9786d
Show file tree
Hide file tree
Showing 16 changed files with 628 additions and 18 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ jobs:
comment-id: ${{ github.event.inputs.comment-id }}
body: |
> :x: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
> :bug: ${{env.GRADLE_SCAN_LINK}}
# In case of self-hosted EC2 errors, remove this block.
stop-test-runner:
name: Stop Build EC2 Runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-google-ads
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_empty_streams.json"
# TODO incremental test is disabled because records output from the report streams can be up to 14 days older than the input state
# incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# ad_group_ad_report: ["segments.date"]
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_protobuf_msg.json"
expect_records:
path: "integration_tests/expected_records_msg.txt"
# TODO incremental test is disabled because records output from the report streams can be up to 14 days older than the input state
# incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# ad_group_ad_report: ["segments.date"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "happytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "unhappytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"streams": [
{
"stream": {
"name": "ad_group_custom",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "happytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["campaign.start_date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["campaign.start_date"]
},
{
"stream": {
"name": "unhappytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["customer.id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-google-ads/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-ads", "pendulum"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-ads==13.0.0", "pendulum"]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock", "pendulum"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock"]

setup(
name="source_google_ads",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import re
from functools import lru_cache
from typing import Any, Dict, List, Mapping

from .streams import IncrementalGoogleAdsStream


class CustomQuery(IncrementalGoogleAdsStream):
def __init__(self, custom_query_config, **kwargs):
self.custom_query_config = custom_query_config
self.user_defined_query = custom_query_config["query"]
super().__init__(**kwargs)

@property
def primary_key(self) -> str:
"""
The primary_key option is disabled. Config should not provide the primary key.
It will be ignored if provided.
If you need to enable it, uncomment the next line instead of `return None` and modify your config
"""
# return self.custom_query_config.get("primary_key") or None
return None

@property
def name(self):
return self.custom_query_config["table_name"]

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
start_date, end_date = self.get_date_params(stream_slice, self.cursor_field)
return self.insert_segments_date_expr(self.user_defined_query, start_date, end_date)

# IncrementalGoogleAdsStream uses get_json_schema a lot while parsing
# responses, caching plaing crucial role for performance here.
@lru_cache()
def get_json_schema(self) -> Dict[str, Any]:
"""
Compose json schema based on user defined query.
:return Dict object representing jsonschema
"""

local_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {},
"additionalProperties": True,
}
# full list {'ENUM', 'STRING', 'DATE', 'DOUBLE', 'RESOURCE_NAME', 'INT32', 'INT64', 'BOOLEAN', 'MESSAGE'}

google_datatype_mapping = {
"INT64": "integer",
"INT32": "integer",
"DOUBLE": "number",
"STRING": "string",
"BOOLEAN": "boolean",
"DATE": "string",
}
fields = CustomQuery.get_query_fields(self.user_defined_query)
fields.append(self.cursor_field)
google_schema = self.google_ads_client.get_fields_metadata(fields)

for field in fields:
node = google_schema.get(field)
# Data type return in enum format: "GoogleAdsFieldDataType.<data_type>"
google_data_type = str(node.data_type).replace("GoogleAdsFieldDataType.", "")
if google_data_type == "ENUM":
field_value = {"type": "string", "enum": list(node.enum_values)}
elif google_data_type == "MESSAGE":
# Represents protobuf message and could be anything, set custom
# attribute "protobuf_message" to convert it to a string (or
# array of strings) later.
# https://developers.google.com/google-ads/api/reference/rpc/v8/GoogleAdsFieldDataTypeEnum.GoogleAdsFieldDataType?hl=en#message
if node.is_repeated:
output_type = ["array", "null"]
else:
output_type = ["string", "null"]
field_value = {"type": output_type, "protobuf_message": True}
else:
output_type = [google_datatype_mapping.get(google_data_type, "string"), "null"]
field_value = {"type": output_type}
local_json_schema["properties"][field] = field_value

return local_json_schema

# Regexp flags for parsing GAQL query
RE_FLAGS = re.DOTALL | re.MULTILINE | re.IGNORECASE
# Regexp for getting query columns
SELECT_EXPR = re.compile("select(.*)from", flags=RE_FLAGS)
WHERE_EXPR = re.compile("where.*", flags=RE_FLAGS)
# list of keywords that can come after WHERE clause,
# according to https://developers.google.com/google-ads/api/docs/query/grammar
KEYWORDS_EXPR = re.compile("(order by|limit|parameters)", flags=RE_FLAGS)

@staticmethod
def get_query_fields(query: str) -> List[str]:
fields = CustomQuery.SELECT_EXPR.search(query)
if not fields:
return []
fields = fields.group(1)
return [f.strip() for f in fields.split(",")]

@staticmethod
def insert_segments_date_expr(query: str, start_date: str, end_date: str) -> str:
"""
Insert segments.date condition to break query into slices for incremental stream.
:param query Origin user defined query
:param start_date start date for metric (inclusive)
:param end_date end date for metric (inclusive)
:return Modified query with date window condition included
"""
# insert segments.date field
columns = CustomQuery.SELECT_EXPR.search(query)
if not columns:
raise Exception("Not valid GAQL expression")
columns = columns.group(1)
new_columns = columns + ", segments.date\n"
result_query = query.replace(columns, new_columns)

# Modify/insert where condition
where_cond = CustomQuery.WHERE_EXPR.search(result_query)
if not where_cond:
# There is no where condition, insert new one
where_location = len(result_query)
keywords = CustomQuery.KEYWORDS_EXPR.search(result_query)
if keywords:
# where condition is not at the end of expression, insert new condition before keyword begins.
where_location = keywords.start()
result_query = (
result_query[0:where_location]
+ f"\nWHERE segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
+ result_query[where_location:]
)
return result_query
# There is already where condition, add segments.date expression
where_cond = where_cond.group(0)
keywords = CustomQuery.KEYWORDS_EXPR.search(where_cond)
if keywords:
# There is some keywords after WHERE condition
where_cond = where_cond[0 : keywords.start()]
new_where_cond = where_cond + f" AND segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
result_query = result_query.replace(where_cond, new_where_cond)
return result_query
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# SOFTWARE.
#


from enum import Enum
from typing import Any, List, Mapping

Expand Down Expand Up @@ -60,10 +61,32 @@ def send_request(self, query: str) -> SearchGoogleAdsResponse:

return self.ga_service.search(search_request)

def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
"""
Issue Google API request to get detailed information on data type for custom query columns.
:params fields list of columns for user defined query.
:return dict of fields type info.
"""

ga_field_service = self.client.get_service("GoogleAdsFieldService")
request = self.client.get_type("SearchGoogleAdsFieldsRequest")
request.page_size = len(fields)
fields_sql = ",".join([f"'{field}'" for field in fields])
request.query = f"""
SELECT
name,
data_type,
enum_values,
is_repeated
WHERE name in ({fields_sql})
"""
response = ga_field_service.search_google_ads_fields(request=request)
return {r.name: r for r in response}

@staticmethod
def get_fields_from_schema(schema: Mapping[str, Any]) -> List[str]:
properties = schema.get("properties")
return [*properties]
return list(properties.keys())

@staticmethod
def convert_schema_into_query(
Expand All @@ -82,7 +105,7 @@ def convert_schema_into_query(
return query_template

@staticmethod
def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
def get_field_value(field_value: GoogleAdsRow, field: str, schema_type: Mapping[str, Any]) -> str:
field_name = field.split(".")
for level_attr in field_name:
"""
Expand Down Expand Up @@ -130,7 +153,6 @@ def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
# In GoogleAdsRow there are attributes that add an underscore at the end in their name.
# For example, 'ad_group_ad.ad.type' is replaced by 'ad_group_ad.ad.type_'.
field_value = getattr(field_value, level_attr + "_", None)

if isinstance(field_value, Enum):
field_value = field_value.name
elif isinstance(field_value, (Repeated, RepeatedComposite)):
Expand All @@ -144,13 +166,23 @@ def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
# For example:
# 1. ad_group_ad.ad.responsive_display_ad.long_headline - type AdTextAsset (https://developers.google.com/google-ads/api/reference/rpc/v6/AdTextAsset?hl=en).
# 2. ad_group_ad.ad.legacy_app_install_ad - type LegacyAppInstallAdInfo (https://developers.google.com/google-ads/api/reference/rpc/v7/LegacyAppInstallAdInfo?hl=en).
#
if not (isinstance(field_value, (list, int, float, str, bool, dict)) or field_value is None):
field_value = str(field_value)
# In case of custom query field has MESSAGE type it represents protobuf
# message and could be anything, convert it to a string or array of
# string if it has "repeated" flag on metadata
if schema_type.get("protobuf_message"):
if "array" in schema_type.get("type"):
field_value = [str(field) for field in field_value]
else:
field_value = str(field_value)

return field_value

@staticmethod
def parse_single_result(schema: Mapping[str, Any], result: GoogleAdsRow):
props = schema.get("properties")
fields = GoogleAds.get_fields_from_schema(schema)
single_record = {field: GoogleAds.get_field_value(result, field) for field in fields}
single_record = {field: GoogleAds.get_field_value(result, field, props.get(field)) for field in fields}
return single_record
Loading

0 comments on commit aa9786d

Please sign in to comment.