Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Google Ads improvement: Support user-specified queries #5302

Merged
merged 39 commits into from
Sep 10, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7ba083b
save the state
vovavovavovavova Aug 5, 2021
0b57c89
save big-big-big draft with dirty pre-upgrade
vovavovavovavova Aug 10, 2021
7ba9a5c
save draft
vovavovavovavova Aug 10, 2021
0b343da
rm prints and schemas on fly
vovavovavovavova Aug 10, 2021
cbb274a
fix
vovavovavovavova Aug 10, 2021
7c4a212
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 10, 2021
a4ddc06
update spec
vovavovavovavova Aug 10, 2021
5a27b95
cleanup
vovavovavovavova Aug 10, 2021
00b9fa1
upgrade configured_catalog for tests
vovavovavovavova Aug 10, 2021
6a2d3d7
rm previously used (now not) parameter
vovavovavovavova Aug 10, 2021
7e4c397
upd suggestion
vovavovavovavova Aug 10, 2021
2c9efa3
format && cleanup imports
vovavovavovavova Aug 10, 2021
b297a6a
remove outdated
vovavovavovavova Aug 10, 2021
80c6e2d
dedup code
vovavovavovavova Aug 11, 2021
99c8e02
dedup 2 lines
vovavovavovavova Aug 11, 2021
4e17d84
clean
vovavovavovavova Aug 11, 2021
c7256c6
Update airbyte-integrations/connectors/source-google-ads/source_googl…
vovavovavovavova Aug 11, 2021
c31c3fd
rm discovery modification; apply get_json_schema instead.
vovavovavovavova Aug 11, 2021
78adddf
upgrade spec; query with field separator
vovavovavovavova Aug 11, 2021
e17a320
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 11, 2021
14e80e1
move def process_query inside get_fields_from_schema
vovavovavovavova Aug 11, 2021
f43c29f
re-write to incremental with 2 moments
vovavovavovavova Aug 13, 2021
5775511
add pattern for basic query validation
vovavovavovavova Aug 13, 2021
a7abb83
add other resources than campaign
vovavovavovavova Aug 13, 2021
dd737dd
add as anytype?
vovavovavovavova Aug 13, 2021
ad8b2c0
dedup code
vovavovavovavova Aug 13, 2021
baae471
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 13, 2021
1ce8bf7
cleanup+ typo fix
vovavovavovavova Aug 13, 2021
a4685dc
merge; add a lot of resources; fix pattern(allow other resources); fi…
vovavovavovavova Aug 25, 2021
bbd678f
disable incremental in custom queries (requested)
vovavovavovavova Aug 26, 2021
7fa232f
Update airbyte-integrations/connectors/source-google-ads/source_googl…
vovavovavovavova Aug 30, 2021
a7b8916
add unit tests
vovavovavovavova Aug 31, 2021
0ba4690
rm double set
vovavovavovavova Aug 31, 2021
9aaab72
Merge remote-tracking branch 'origin/master' into valdemar/#5165_cust…
vovavovavovavova Aug 31, 2021
2260ca2
set prev version since https://github.com/googleads/google-ads-python…
vovavovavovavova Aug 31, 2021
3ee960b
Compose json schema based on metadata request
Sep 6, 2021
c5d55de
Add incremental stream support
Sep 7, 2021
612ffc3
Add gradle link on CI comment when build failed.
Sep 7, 2021
22be66e
Fix review comments
Sep 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,28 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "happytable",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this for?

Copy link
Contributor Author

@vovavovavovavova vovavovavovavova Aug 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the user side, catalog commandline argument is taken from the discover.
from our side for read we use console command python main.py read --config ... --catalog ...
This is to fill this command and be able to run it and get the data from source.

"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["campaign.start_date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["campaign.start_date"]
},
{
"stream": {
"name": "unhappytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["customer.id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,28 @@
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "happytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["campaign.start_date"]
},
"sync_mode": "incremental",
"destination_sync_mode": "append",
"cursor_field": ["campaign.start_date"]
},
{
"stream": {
"name": "unhappytable",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_primary_key": [["customer.id"]]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
# SOFTWARE.
#


from enum import Enum
from typing import Any, List, Mapping

Expand Down Expand Up @@ -62,6 +63,7 @@ def send_request(self, query: str) -> SearchGoogleAdsResponse:
@staticmethod
def get_fields_from_schema(schema: Mapping[str, Any]) -> List[str]:
properties = schema.get("properties")
# return list(properties.keys()) will be more clear ?
return [*properties]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return [*properties]
return list(properties.keys())


@staticmethod
Expand Down Expand Up @@ -148,7 +150,17 @@ def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
return field_value

@staticmethod
def parse_single_result(schema: Mapping[str, Any], result: GoogleAdsRow):
fields = GoogleAds.get_fields_from_schema(schema)
def process_query(query) -> List:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annotations everywhere please

query = query.lower().split("select")[1].split("from")[0].strip()
fields = query.split(",")
fields = [i.strip() for i in fields]
return fields

@staticmethod
def parse_single_result(schema: Mapping[str, Any], result: GoogleAdsRow, query: str = None):
if not query:
fields = GoogleAds.get_fields_from_schema(schema)
else:
fields = GoogleAds.process_query(query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be part of the schema, i.e. get_json_schema should return correct schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've changed parse_single_result only for calling process_query. Why you can't do this directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the second part (parse google protobuff response, structure without possibility to list the response fields) require those fields to call getattr. In this implementation, if the schema is from shemas folder, if case will be called, otherwice those attr will be taken from query. If i call it directly, I will have to duplicate the getattr code (in this function below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is outdated and moved to get_fields_from_schema

single_record = {field: GoogleAds.get_field_value(result, field) for field in fields}
return single_record
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
#


from typing import Any, List, Mapping, Tuple
from typing import Any, List, Mapping, MutableMapping, Tuple

from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import AirbyteCatalog, AirbyteStream, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from google.ads.googleads.errors import GoogleAdsException
Expand All @@ -39,13 +39,52 @@
AdGroupAds,
AdGroups,
Campaigns,
CustomQuery,
CustomQueryFullRefresh,
CustomQueryIncremental,
DisplayKeywordPerformanceReport,
DisplayTopicsPerformanceReport,
ShoppingPerformanceReport,
)


class SourceGoogleAds(AbstractSource):
def get_local_json_schema(self, config) -> MutableMapping[str, Any]:
"""
As agreed, now it returns the default schema (since read -> schema_generator.py may take hours for the end user).
If we want to redesign json schema from raw query, this method need to be modified.
"""
local_json_schema = {"$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "additionalProperties": True}
return local_json_schema

def discover(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteCatalog:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to override this method if you implement streams method properly and return meaninfull schema in get_json_schema

# streams = [stream.as_airbyte_stream() for stream in self.streams(config=config)]
streams = []
for stream in self.streams(config=config):
if not isinstance(stream, (CustomQueryFullRefresh, CustomQueryIncremental)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking for CustomQuery is enough,
but in general the whole check is not needed, see my other comments

streams.append(stream.as_airbyte_stream())
# TODO: extend with custom defined streams
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if you did this here actually, is this still a relevant comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a mark where the default discover logic was extended, will be removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to change it at all, just implement this in streams

for usr_query in config.get("custom_query", []):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't you do this inside CustomQueryFullRefresh and CustomQueryIncremental classes ?

Copy link
Contributor Author

@vovavovavovavova vovavovavovavova Aug 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need this to be made in discover, to fill schemas (dynamically, it is very important)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what stops you from doing this in streams??? this is what get_json_schema for

local_cursor_field = (
[usr_query.get("cursor_field")] if isinstance(usr_query.get("cursor_field"), str) else usr_query.get("cursor_field")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, it's better to set specification properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cursor field is already a property if we have generic incremental stream

)
stream = AirbyteStream(
name=usr_query["table_name"],
json_schema=self.get_local_json_schema(config=config),
supported_sync_modes=[SyncMode.full_refresh],
)
if usr_query.get("cursor_field"):
stream.source_defined_cursor = True # ???
stream.supported_sync_modes.append(SyncMode.incremental) # type: ignore
stream.default_cursor_field = local_cursor_field

keys = Stream._wrapped_primary_key(usr_query.get("primary_key") or None) # (!!! read empty strings as null aswell)
if keys and len(keys) > 0:
stream.source_defined_primary_key = keys
streams.append(stream)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why properties of custom streams are missed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

they are defined when we create CustomQueryX instance, otherwice connector would not be able to work (it works)

# end of TODO
return AirbyteCatalog(streams=streams)

def get_credentials(self, config: Mapping[str, Any]) -> Mapping[str, Any]:
credentials = config["credentials"]

Expand All @@ -69,6 +108,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
incremental_stream_config = dict(
api=google_api, conversion_window_days=config["conversion_window_days"], start_date=config["start_date"]
)

custom_query_streams = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to validate custom queries in the check_connection method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use this for validating queries.

Copy link
Contributor Author

@vovavovavovavova vovavovavovavova Aug 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used regex in spec.json, since google query validator works on client (UI javascript) side

CustomQuery(custom_query_config=config["custom_query"][i], **incremental_stream_config)
for i in range(len(config.get("custom_query", [])))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need range here???

]
return [
AccountPerformanceReport(**incremental_stream_config),
DisplayTopicsPerformanceReport(**incremental_stream_config),
Expand All @@ -79,4 +123,4 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
AdGroups(api=google_api),
Accounts(api=google_api),
Campaigns(api=google_api),
]
] + custom_query_streams
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,31 @@
"maximum": 1095,
"default": 14,
"examples": [14]
},
"custom_query": {
keu marked this conversation as resolved.
Show resolved Hide resolved
"type": "array",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"type": "array",
"type": "array",
"title": "Custom GAQL Queries"

"items": {
"type": ["object", "null"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query and table_name should be required

"properties": {
"query": {
"type": "string",
"title": "User defined query to build a report by wish",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"title": "User defined query to build a report by wish",
"description": "A custom defined GAQL query for building the report.",

"examples": ["SELECT segments.ad_destination_type FROM campaign"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user wants to set several fields, how it should be separated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comma-separated, I will update example and will add one more field

},
"primary_key": {
"type": ["string", "null"],
"title": "The unique field to be used as primary key in destination database (if provided)"
},
"cursor_field": {
"type": ["string", "null"],
"title": "If not provided, the FULL-REFRESH model will be used. If provided, this will be an incremental stream with this cursor field. Please use datetime fields (start_date, end_date) for proper work."
vovavovavovavova marked this conversation as resolved.
Show resolved Hide resolved
},
"table_name": {
"type": "string",
"title": "The table name in your destination database for choosen query."
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,73 @@ class ShoppingPerformanceReport(IncrementalGoogleAdsStream):
ShoppingPerformanceReport stream: https://developers.google.com/google-ads/api/fields/v8/shopping_performance_view
Google Ads API field mapping: https://developers.google.com/google-ads/api/docs/migration/mapping#shopping_performance
"""


class CustomQueryFullRefresh(GoogleAdsStream):
"""
Class that should sync by custom user query to Google Ads API
Fixme: check if WHERE>start_date was applied in standard fullrefresh stream. If yes, reapply here.
"""

def __init__(self, custom_query_config, **kwargs):
self.custom_query_config = custom_query_config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a lot of duplication, no need to create two separate classes,
control behaviour with config

self.user_defined_query = custom_query_config["query"]
super().__init__(api=kwargs["api"])

@property
def primary_key(self) -> str:
return self.custom_query_config.get("primary_key") or None # not empty stings

@property
def name(self):
return self.custom_query_config["table_name"]

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
return self.user_defined_query

def parse_response(self, response: SearchPager) -> Iterable[Mapping]:
for result in response:
yield self.google_ads_client.parse_single_result(schema=None, result=result, query=self.user_defined_query)


class CustomQueryIncremental(IncrementalGoogleAdsStream):
"""
Class that should sync by custom user query to Google Ads API
"""

def __init__(self, custom_query_config, **kwargs):
self.custom_query_config = custom_query_config
self.user_defined_query = custom_query_config["query"]
super().__init__(**kwargs)

@property
def primary_key(self) -> str:
return self.custom_query_config.get("primary_key") or None

@property
def cursor_field(self) -> str:
return self.custom_query_config["cursor_field"]

@property
def name(self):
return self.custom_query_config["table_name"]

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
start_date, end_date = self.get_date_params(stream_slice, self.cursor_field)
final_query = (
self.user_defined_query
+ f"\nWHERE {self.cursor_field} > '{start_date}' AND {self.cursor_field} < '{end_date}' ORDER BY {self.cursor_field} ASC"
)
return final_query

def parse_response(self, response: SearchPager) -> Iterable[Mapping]:
for result in response:
yield self.google_ads_client.parse_single_result(schema=None, result=result, query=self.user_defined_query)


class CustomQuery:
def __new__(cls, *args, **kwargs):
if kwargs.get("custom_query_config", {}).get("cursor_field"):
return CustomQueryIncremental(*args, **kwargs)
else:
return CustomQueryFullRefresh(*args, **kwargs)