Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Google Ads improvement: Support user-specified queries #5302

Merged
merged 39 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7ba083b
save the state
vovavovavovavova Aug 5, 2021
0b57c89
save big-big-big draft with dirty pre-upgrade
vovavovavovavova Aug 10, 2021
7ba9a5c
save draft
vovavovavovavova Aug 10, 2021
0b343da
rm prints and schemas on fly
vovavovavovavova Aug 10, 2021
cbb274a
fix
vovavovavovavova Aug 10, 2021
7c4a212
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 10, 2021
a4ddc06
update spec
vovavovavovavova Aug 10, 2021
5a27b95
cleanup
vovavovavovavova Aug 10, 2021
00b9fa1
upgrade configured_catalog for tests
vovavovavovavova Aug 10, 2021
6a2d3d7
rm previously used (now not) parameter
vovavovavovavova Aug 10, 2021
7e4c397
upd suggestion
vovavovavovavova Aug 10, 2021
2c9efa3
format && cleanup imports
vovavovavovavova Aug 10, 2021
b297a6a
remove outdated
vovavovavovavova Aug 10, 2021
80c6e2d
dedup code
vovavovavovavova Aug 11, 2021
99c8e02
dedup 2 lines
vovavovavovavova Aug 11, 2021
4e17d84
clean
vovavovavovavova Aug 11, 2021
c7256c6
Update airbyte-integrations/connectors/source-google-ads/source_googl…
vovavovavovavova Aug 11, 2021
c31c3fd
rm discovery modification; apply get_json_schema instead.
vovavovavovavova Aug 11, 2021
78adddf
upgrade spec; query with field separator
vovavovavovavova Aug 11, 2021
e17a320
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 11, 2021
14e80e1
move def process_query inside get_fields_from_schema
vovavovavovavova Aug 11, 2021
f43c29f
re-write to incremental with 2 moments
vovavovavovavova Aug 13, 2021
5775511
add pattern for basic query validation
vovavovavovavova Aug 13, 2021
a7abb83
add other resources than campaign
vovavovavovavova Aug 13, 2021
dd737dd
add as anytype?
vovavovavovavova Aug 13, 2021
ad8b2c0
dedup code
vovavovavovavova Aug 13, 2021
baae471
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 13, 2021
1ce8bf7
cleanup+ typo fix
vovavovavovavova Aug 13, 2021
a4685dc
merge; add a lot of resources; fix pattern(allow other resources); fi…
vovavovavovavova Aug 25, 2021
bbd678f
disable incremental in custom queries (requested)
vovavovavovavova Aug 26, 2021
7fa232f
Update airbyte-integrations/connectors/source-google-ads/source_googl…
vovavovavovavova Aug 30, 2021
a7b8916
add unit tests
vovavovavovavova Aug 31, 2021
0ba4690
rm double set
vovavovavovavova Aug 31, 2021
9aaab72
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 31, 2021
2260ca2
set prev version since https://github.com/googleads/google-ads-python…
vovavovavovavova Aug 31, 2021
3ee960b
Compose json schema based on metadata request
Sep 6, 2021
c5d55de
Add incremental stream support
Sep 7, 2021
612ffc3
Add gradle link on CI comment when build failed.
Sep 7, 2021
22be66e
Fix review comments
Sep 9, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ jobs:
comment-id: ${{ github.event.inputs.comment-id }}
body: |
> :x: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
> :bug: ${{env.GRADLE_SCAN_LINK}}
# In case of self-hosted EC2 errors, remove this block.
stop-test-runner:
name: Stop Build EC2 Runner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ RUN pip install .

ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-google-ads
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ tests:
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_without_empty_streams.json"
# TODO incremental test is disabled because records output from the report streams can be up to 14 days older than the input state
# incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# ad_group_ad_report: ["segments.date"]
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog_protobuf_msg.json"
expect_records:
path: "integration_tests/expected_records_msg.txt"
# TODO incremental test is disabled because records output from the report streams can be up to 14 days older than the input state
# incremental:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state_path: "integration_tests/abnormal_state.json"
# cursor_paths:
# ad_group_ad_report: ["segments.date"]
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "happytable",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this for?

Copy link
Contributor Author

@vovavovavovavova vovavovavovavova Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the user side, catalog commandline argument is taken from the discover.
from our side for read we use console command python main.py read --config ... --catalog ...
This is to fill this command and be able to run it and get the data from source.

"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "unhappytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"streams": [
{
"stream": {
"name": "ad_group_custom",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["segments.date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite",
"cursor_field": ["segments.date"]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "happytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["campaign.start_date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["campaign.start_date"]
},
{
"stream": {
"name": "unhappytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["customer.id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-google-ads/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-ads", "pendulum"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-ads==13.0.0", "pendulum"]

TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock", "pendulum"]
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock"]

setup(
name="source_google_ads",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#

import re
from functools import lru_cache
from typing import Any, Dict, List, Mapping

from .streams import IncrementalGoogleAdsStream


class CustomQuery(IncrementalGoogleAdsStream):
def __init__(self, custom_query_config, **kwargs):
self.custom_query_config = custom_query_config
self.user_defined_query = custom_query_config["query"]
super().__init__(**kwargs)

@property
def primary_key(self) -> str:
"""
The primary_key option is disabled. Config should not provide the primary key.
It will be ignored if provided.
If you need to enable it, uncomment the next line instead of `return None` and modify your config
"""
# return self.custom_query_config.get("primary_key") or None
return None

@property
def name(self):
return self.custom_query_config["table_name"]

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
start_date, end_date = self.get_date_params(stream_slice, self.cursor_field)
return self.insert_segments_date_expr(self.user_defined_query, start_date, end_date)

# IncrementalGoogleAdsStream uses get_json_schema a lot while parsing
# responses, caching plaing crucial role for performance here.
@lru_cache()
def get_json_schema(self) -> Dict[str, Any]:
"""
Compose json schema based on user defined query.
:return Dict object representing jsonschema
"""

local_json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {},
"additionalProperties": True,
}
# full list {'ENUM', 'STRING', 'DATE', 'DOUBLE', 'RESOURCE_NAME', 'INT32', 'INT64', 'BOOLEAN', 'MESSAGE'}

google_datatype_mapping = {
"INT64": "integer",
"INT32": "integer",
"DOUBLE": "number",
"STRING": "string",
"BOOLEAN": "boolean",
"DATE": "string",
}
fields = CustomQuery.get_query_fields(self.user_defined_query)
fields.append(self.cursor_field)
google_schema = self.google_ads_client.get_fields_metadata(fields)

for field in fields:
node = google_schema.get(field)
# Data type return in enum format: "GoogleAdsFieldDataType.<data_type>"
google_data_type = str(node.data_type).replace("GoogleAdsFieldDataType.", "")
if google_data_type == "ENUM":
field_value = {"type": "string", "enum": list(node.enum_values)}
elif google_data_type == "MESSAGE":
# Represents protobuf message and could be anything, set custom
# attribute "protobuf_message" to convert it to a string (or
# array of strings) later.
# https://developers.google.com/google-ads/api/reference/rpc/v8/GoogleAdsFieldDataTypeEnum.GoogleAdsFieldDataType?hl=en#message
if node.is_repeated:
output_type = ["array", "null"]
else:
output_type = ["string", "null"]
field_value = {"type": output_type, "protobuf_message": True}
else:
output_type = [google_datatype_mapping.get(google_data_type, "string"), "null"]
field_value = {"type": output_type}
local_json_schema["properties"][field] = field_value

return local_json_schema

# Regexp flags for parsing GAQL query
RE_FLAGS = re.DOTALL | re.MULTILINE | re.IGNORECASE
# Regexp for getting query columns
SELECT_EXPR = re.compile("select(.*)from", flags=RE_FLAGS)
WHERE_EXPR = re.compile("where.*", flags=RE_FLAGS)
# list of keywords that can come after WHERE clause,
# according to https://developers.google.com/google-ads/api/docs/query/grammar
KEYWORDS_EXPR = re.compile("(order by|limit|parameters)", flags=RE_FLAGS)

@staticmethod
def get_query_fields(query: str) -> List[str]:
fields = CustomQuery.SELECT_EXPR.search(query)
if not fields:
return []
fields = fields.group(1)
return [f.strip() for f in fields.split(",")]

@staticmethod
def insert_segments_date_expr(query: str, start_date: str, end_date: str) -> str:
"""
Insert segments.date condition to break query into slices for incremental stream.
:param query Origin user defined query
:param start_date start date for metric (inclusive)
:param end_date end date for metric (inclusive)
:return Modified query with date window condition included
"""
# insert segments.date field
columns = CustomQuery.SELECT_EXPR.search(query)
if not columns:
raise Exception("Not valid GAQL expression")
columns = columns.group(1)
new_columns = columns + ", segments.date\n"
result_query = query.replace(columns, new_columns)

# Modify/insert where condition
where_cond = CustomQuery.WHERE_EXPR.search(result_query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a very cool approach, I like it. The only issue I see here is WHERE could contain a time-bound function like LAST_14_DAYS etc. then wouldn't it conflict with this incremental logic?

In this case I think we should not add the where condition and just default to what the user did

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case I've added checking if custom query contain "segments.date" field on check_connection method. It wont allow adding time-bound conditions to custom queries.

if not where_cond:
# There is no where condition, insert new one
where_location = len(result_query)
keywords = CustomQuery.KEYWORDS_EXPR.search(result_query)
if keywords:
# where condition is not at the end of expression, insert new condition before keyword begins.
where_location = keywords.start()
result_query = (
result_query[0:where_location]
+ f"\nWHERE segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
+ result_query[where_location:]
)
return result_query
# There is already where condition, add segments.date expression
where_cond = where_cond.group(0)
keywords = CustomQuery.KEYWORDS_EXPR.search(where_cond)
if keywords:
# There is some keywords after WHERE condition
where_cond = where_cond[0 : keywords.start()]
new_where_cond = where_cond + f" AND segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
result_query = result_query.replace(where_cond, new_where_cond)
return result_query
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# SOFTWARE.
#


from enum import Enum
from typing import Any, List, Mapping

Expand Down Expand Up @@ -60,10 +61,32 @@ def send_request(self, query: str) -> SearchGoogleAdsResponse:

return self.ga_service.search(search_request)

def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
"""
Issue Google API request to get detailed information on data type for custom query columns.
:params fields list of columns for user defined query.
:return dict of fields type info.
"""

ga_field_service = self.client.get_service("GoogleAdsFieldService")
request = self.client.get_type("SearchGoogleAdsFieldsRequest")
request.page_size = len(fields)
fields_sql = ",".join([f"'{field}'" for field in fields])
request.query = f"""
SELECT
name,
data_type,
enum_values,
is_repeated
WHERE name in ({fields_sql})
"""
response = ga_field_service.search_google_ads_fields(request=request)
return {r.name: r for r in response}

@staticmethod
def get_fields_from_schema(schema: Mapping[str, Any]) -> List[str]:
properties = schema.get("properties")
return [*properties]
return list(properties.keys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if schema doesn't have properties?

Copy link
Contributor Author

@vovavovavovavova vovavovavovavova Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the error will be raised, but this error will be raised as well in previous implementation ( return [*properties]) - we always had them in predefined schemas in schemas folder. (now this method is used for only predefied schemas, not the custom ones)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, in case we have no properties in schema (like we used), I parse the raw query.
Getting attributes is mandatory, we cannot perform read properly without it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outdated, now everything read is taken from get_json_schema


@staticmethod
def convert_schema_into_query(
Expand All @@ -82,7 +105,7 @@ def convert_schema_into_query(
return query_template

@staticmethod
def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
def get_field_value(field_value: GoogleAdsRow, field: str, schema_type: Mapping[str, Any]) -> str:
field_name = field.split(".")
for level_attr in field_name:
"""
Expand Down Expand Up @@ -130,7 +153,6 @@ def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
# In GoogleAdsRow there are attributes that add an underscore at the end in their name.
# For example, 'ad_group_ad.ad.type' is replaced by 'ad_group_ad.ad.type_'.
field_value = getattr(field_value, level_attr + "_", None)

if isinstance(field_value, Enum):
field_value = field_value.name
elif isinstance(field_value, (Repeated, RepeatedComposite)):
Expand All @@ -144,13 +166,23 @@ def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
# For example:
# 1. ad_group_ad.ad.responsive_display_ad.long_headline - type AdTextAsset (https://developers.google.com/google-ads/api/reference/rpc/v6/AdTextAsset?hl=en).
# 2. ad_group_ad.ad.legacy_app_install_ad - type LegacyAppInstallAdInfo (https://developers.google.com/google-ads/api/reference/rpc/v7/LegacyAppInstallAdInfo?hl=en).
#
if not (isinstance(field_value, (list, int, float, str, bool, dict)) or field_value is None):
field_value = str(field_value)
# In case of custom query field has MESSAGE type it represents protobuf
# message and could be anything, convert it to a string or array of
# string if it has "repeated" flag on metadata
if schema_type.get("protobuf_message"):
if "array" in schema_type.get("type"):
field_value = [str(field) for field in field_value]
else:
field_value = str(field_value)

return field_value

@staticmethod
def parse_single_result(schema: Mapping[str, Any], result: GoogleAdsRow):
props = schema.get("properties")
fields = GoogleAds.get_fields_from_schema(schema)
single_record = {field: GoogleAds.get_field_value(result, field) for field in fields}
single_record = {field: GoogleAds.get_field_value(result, field, props.get(field)) for field in fields}
return single_record
Loading