From 1fa4461c3479f93441d1e9787a8a2f06fd7d93f5 Mon Sep 17 00:00:00 2001 From: vladimir Date: Tue, 20 Jul 2021 16:40:42 +0200 Subject: [PATCH 1/8] Connector Facebook-Marketing: update streams with custom streams --- .../source_facebook_marketing/source.py | 42 ++++++++++++++++-- .../source_facebook_marketing/streams.py | 44 +++++++++++++++++++ 2 files changed, 83 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index f04806dda92a..10c604e1d111 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -23,10 +23,16 @@ # from datetime import datetime -from typing import Any, List, Mapping, Tuple, Type +from typing import Any, List, Mapping, Tuple, Type, Optional +from airbyte_cdk.entrypoint import logger -from airbyte_cdk.models import ConnectorSpecification, DestinationSyncMode +from airbyte_cdk.models import ( + ConnectorSpecification, + DestinationSyncMode, + AirbyteCatalog +) from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.sources.streams import Stream from pydantic import BaseModel, Field from source_facebook_marketing.api import API @@ -41,6 +47,13 @@ AdsInsightsPlatformAndDevice, AdsInsightsRegion, Campaigns, + CustomAdsInsights, + CustomAdsInsightsAgeAndGender, + CustomAdsInsightsCountry, + CustomAdsInsightsRegion, + CustomAdsInsightsDma, + CustomAdsInsightsPlatformAndDevice, + ) @@ -76,6 +89,7 @@ class Config: minimum=1, maximum=30, ) + custom_insights_fields: Optional[str] = Field(description="A list of fields separate by commas describing the custom fields you want to sync from Facebook-AdsInsights") class SourceFacebookMarketing(AbstractSource): @@ -113,7 +127,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: days_per_job=config.insights_days_per_job, ) - return [ + streams = [ Campaigns(api=api, start_date=config.start_date, include_deleted=config.include_deleted), AdSets(api=api, start_date=config.start_date, include_deleted=config.include_deleted), Ads(api=api, start_date=config.start_date, include_deleted=config.include_deleted), @@ -126,6 +140,10 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: AdsInsightsPlatformAndDevice(**insights_args), ] + streams = self._add_custom_adsinsights_streams(config=config, args=insights_args, streams=streams) + + return streams + def spec(self, *args, **kwargs) -> ConnectorSpecification: """ Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) @@ -138,3 +156,21 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: supported_destination_sync_modes=[DestinationSyncMode.append], connectionSpecification=ConnectorConfig.schema(), ) + + def _add_custom_adsinsights_streams(self, config, args, streams) -> List[Type[Stream]]: + """ Update method, returns streams plus custom streams + After we checked if 'custom_insights_fields' exists we add the custom streams with the + fields that we setted in the config + """ + adsinsights_selected_fields = config.custom_insights_fields + if adsinsights_selected_fields: + args['list_selected_fields'] = [field.strip() for field in adsinsights_selected_fields.split(',') if field and field.strip() !=''] + streams += [ + CustomAdsInsights(**args), + CustomAdsInsightsAgeAndGender(**args), + CustomAdsInsightsCountry(**args), + CustomAdsInsightsRegion(**args), + CustomAdsInsightsDma(**args), + CustomAdsInsightsPlatformAndDevice(**args)] + + return streams diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index 1c375bb4f7bb..61f893c17b4d 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -479,3 +479,47 @@ class AdsInsightsDma(AdsInsights): class AdsInsightsPlatformAndDevice(AdsInsights): breakdowns = ["publisher_platform", "platform_position", "impression_device"] action_breakdowns = ["action_type"] # FB Async Job fails for unknown reason if we set other breakdowns + + +class CustomAdsInsights(AdsInsights): + + def __init__(self, list_selected_fields, **kwargs): + super().__init__(**kwargs) + self._list_selected_fields = list_selected_fields + + def get_json_schema(self) -> Mapping[str, Any]: + """Add fields from breakdowns to the stream schema + :return: A dict of the JSON schema representing this stream. + """ + schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") + schema["properties"] = {k:v for k,v in schema["properties"].items() if k in self._list_selected_fields} + schema["properties"].update(self._schema_for_breakdowns()) + return schema + + @cached_property + def fields(self) -> List[str]: + """List of fields that we want to query""" + schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") + schema["properties"] = {k:v for k,v in schema["properties"].items() if k in self._list_selected_fields} + return list(schema.get("properties", {}).keys()) + + +class CustomAdsInsightsAgeAndGender(CustomAdsInsights): + breakdowns = ["age", "gender"] + + +class CustomAdsInsightsCountry(CustomAdsInsights): + breakdowns = ["country"] + + +class CustomAdsInsightsRegion(CustomAdsInsights): + breakdowns = ["region"] + + +class CustomAdsInsightsDma(CustomAdsInsights): + breakdowns = ["dma"] + + +class CustomAdsInsightsPlatformAndDevice(CustomAdsInsights): + breakdowns = ["publisher_platform", "platform_position", "impression_device"] + action_breakdowns = ["action_type"] # FB Async Job fails for unknown reason if we set other breakdowns From 4cf644e33b2a0b0e8f06290d76972039cbbf4b24 Mon Sep 17 00:00:00 2001 From: vladimir Date: Wed, 22 Sep 2021 13:12:12 +0200 Subject: [PATCH 2/8] update: remove custom streams, add new custom insights from config --- .../source_facebook_marketing/source.py | 42 ++++++----- .../source_facebook_marketing/streams.py | 73 +++++++------------ 2 files changed, 50 insertions(+), 65 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 10c604e1d111..da7d49ce71ae 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -21,7 +21,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # - +import json from datetime import datetime from typing import Any, List, Mapping, Tuple, Type, Optional from airbyte_cdk.entrypoint import logger @@ -34,7 +34,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.sources.streams import Stream -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, Json from source_facebook_marketing.api import API from source_facebook_marketing.streams import ( AdCreatives, @@ -89,7 +89,7 @@ class Config: minimum=1, maximum=30, ) - custom_insights_fields: Optional[str] = Field(description="A list of fields separate by commas describing the custom fields you want to sync from Facebook-AdsInsights") + custom_insights: Optional[Json] = Field(description="A json objet with custom insights") class SourceFacebookMarketing(AbstractSource): @@ -132,6 +132,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: AdSets(api=api, start_date=config.start_date, include_deleted=config.include_deleted), Ads(api=api, start_date=config.start_date, include_deleted=config.include_deleted), AdCreatives(api=api), + AdsInsights(**insights_args), AdsInsightsAgeAndGender(**insights_args), AdsInsightsCountry(**insights_args), @@ -140,9 +141,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: AdsInsightsPlatformAndDevice(**insights_args), ] - streams = self._add_custom_adsinsights_streams(config=config, args=insights_args, streams=streams) - - return streams + return self._add_custom_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams) def spec(self, *args, **kwargs) -> ConnectorSpecification: """ @@ -157,20 +156,23 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: connectionSpecification=ConnectorConfig.schema(), ) - def _add_custom_adsinsights_streams(self, config, args, streams) -> List[Type[Stream]]: + def _add_custom_insights_streams(self, insights, args, streams) -> List[Type[Stream]]: """ Update method, returns streams plus custom streams After we checked if 'custom_insights_fields' exists we add the custom streams with the - fields that we setted in the config + fields that we setted in the confi """ - adsinsights_selected_fields = config.custom_insights_fields - if adsinsights_selected_fields: - args['list_selected_fields'] = [field.strip() for field in adsinsights_selected_fields.split(',') if field and field.strip() !=''] - streams += [ - CustomAdsInsights(**args), - CustomAdsInsightsAgeAndGender(**args), - CustomAdsInsightsCountry(**args), - CustomAdsInsightsRegion(**args), - CustomAdsInsightsDma(**args), - CustomAdsInsightsPlatformAndDevice(**args)] - - return streams + insights_custom_streams = list() + for insight_entry in insights.get('insights'): + args['name'] = insight_entry.get('name') + args['fields'] = insight_entry.get('fields') + args['breakdowns'] = insight_entry.get('breakdowns') + args['action_breakdowns'] = insight_entry.get('action_breakdowns') + insight_stream = AdsInsights(**args) + insights_custom_streams.append(insight_stream) + + new_streams = list() + for stream in streams: + if stream.name not in [e.name for e in insights_custom_streams]: + new_streams.append(stream) + + return new_streams + insights_custom_streams diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index 61f893c17b4d..1da69c5b7b60 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -31,6 +31,8 @@ import backoff import pendulum +import airbyte_cdk.sources.utils.casing as casing + from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.core import package_name_from_class @@ -305,10 +307,31 @@ class AdsInsights(FBMarketingIncrementalStream): breakdowns = [] - def __init__(self, buffer_days, days_per_job, **kwargs): + def __init__( + self, + buffer_days, + days_per_job, + name: str = None, + fields: List[str] = None, + breakdowns: List[str] = None, + action_breakdowns: List[str] = None, + **kwargs): + super().__init__(**kwargs) self.lookback_window = pendulum.duration(days=buffer_days) self._days_per_job = days_per_job + self._fields = fields + self.action_breakdowns = action_breakdowns or self.action_breakdowns + self.breakdowns = breakdowns or self.breakdowns + self._new_class_name = name + + @property + def name(self) -> str: + """ + :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. + """ + name = self._new_class_name or self.__class__.__name__ + return casing.camel_to_snake(name) def read_records( self, @@ -403,12 +426,16 @@ def get_json_schema(self) -> Mapping[str, Any]: :return: A dict of the JSON schema representing this stream. """ schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") + if self._fields: + schema["properties"] = {k:v for k,v in schema["properties"].items() if k in self._fields} schema["properties"].update(self._schema_for_breakdowns()) return schema @cached_property def fields(self) -> List[str]: """List of fields that we want to query, for now just all properties from stream's schema""" + if self._fields: + return self._fields schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") return list(schema.get("properties", {}).keys()) @@ -479,47 +506,3 @@ class AdsInsightsDma(AdsInsights): class AdsInsightsPlatformAndDevice(AdsInsights): breakdowns = ["publisher_platform", "platform_position", "impression_device"] action_breakdowns = ["action_type"] # FB Async Job fails for unknown reason if we set other breakdowns - - -class CustomAdsInsights(AdsInsights): - - def __init__(self, list_selected_fields, **kwargs): - super().__init__(**kwargs) - self._list_selected_fields = list_selected_fields - - def get_json_schema(self) -> Mapping[str, Any]: - """Add fields from breakdowns to the stream schema - :return: A dict of the JSON schema representing this stream. - """ - schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") - schema["properties"] = {k:v for k,v in schema["properties"].items() if k in self._list_selected_fields} - schema["properties"].update(self._schema_for_breakdowns()) - return schema - - @cached_property - def fields(self) -> List[str]: - """List of fields that we want to query""" - schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") - schema["properties"] = {k:v for k,v in schema["properties"].items() if k in self._list_selected_fields} - return list(schema.get("properties", {}).keys()) - - -class CustomAdsInsightsAgeAndGender(CustomAdsInsights): - breakdowns = ["age", "gender"] - - -class CustomAdsInsightsCountry(CustomAdsInsights): - breakdowns = ["country"] - - -class CustomAdsInsightsRegion(CustomAdsInsights): - breakdowns = ["region"] - - -class CustomAdsInsightsDma(CustomAdsInsights): - breakdowns = ["dma"] - - -class CustomAdsInsightsPlatformAndDevice(CustomAdsInsights): - breakdowns = ["publisher_platform", "platform_position", "impression_device"] - action_breakdowns = ["action_type"] # FB Async Job fails for unknown reason if we set other breakdowns From ab6fc13a3aeb956592fc47bf210924ee8e236c1a Mon Sep 17 00:00:00 2001 From: vladimir Date: Wed, 29 Sep 2021 13:12:31 +0200 Subject: [PATCH 3/8] update: add new model for InsightConfig, remove old imports --- .../source_facebook_marketing/source.py | 59 ++++++++++++------- 1 file changed, 39 insertions(+), 20 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index da7d49ce71ae..bb61ae6261ad 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -34,7 +34,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.sources.streams import Stream -from pydantic import BaseModel, Field, Json +from pydantic import BaseModel, Field from source_facebook_marketing.api import API from source_facebook_marketing.streams import ( AdCreatives, @@ -46,17 +46,29 @@ AdsInsightsDma, AdsInsightsPlatformAndDevice, AdsInsightsRegion, - Campaigns, - CustomAdsInsights, - CustomAdsInsightsAgeAndGender, - CustomAdsInsightsCountry, - CustomAdsInsightsRegion, - CustomAdsInsightsDma, - CustomAdsInsightsPlatformAndDevice, - + Campaigns ) +class InsightConfig(BaseModel): + + name: str = Field( + description='The name value of insight' + ) + + fields: Optional[List[str]] = Field( + description='A list of chosen fields for fields parameter' + ) + + breakdowns: Optional[List[str]] = Field( + description='A list of chosen breakdowns for breakdowns' + ) + + action_breakdowns: Optional[List[str]] = Field( + description='A list of chosen action_breakdowns for action_breakdowns' + ) + + class ConnectorConfig(BaseModel): class Config: title = "Source Facebook Marketing" @@ -89,7 +101,10 @@ class Config: minimum=1, maximum=30, ) - custom_insights: Optional[Json] = Field(description="A json objet with custom insights") + insights: Optional[List[InsightConfig]] = Field( + description="A defined list wich contains insights entries, each entry must have a name and can contain these entries(fields, breakdowns or action_breakdowns)", + examples=["[{\"name\": \"AdsInsights\",\"fields\": [\"account_id\",\"account_name\",\"ad_id\",\"ad_name\",\"adset_id\",\"adset_name\",\"campaign_id\",\"campaign_name\",\"date_start\",\"impressions\",\"spend\"],\"breakdowns\": [],\"action_breakdowns\": []}]"] + ) class SourceFacebookMarketing(AbstractSource): @@ -141,7 +156,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: AdsInsightsPlatformAndDevice(**insights_args), ] - return self._add_custom_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams) + return self._update_insights_streams(insights=config.insights, args=insights_args, streams=streams) def spec(self, *args, **kwargs) -> ConnectorSpecification: """ @@ -156,17 +171,21 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: connectionSpecification=ConnectorConfig.schema(), ) - def _add_custom_insights_streams(self, insights, args, streams) -> List[Type[Stream]]: - """ Update method, returns streams plus custom streams - After we checked if 'custom_insights_fields' exists we add the custom streams with the - fields that we setted in the confi + def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream]]: + """ Update method, if insights have values returns streams replacing the + default insights streams else returns streams + """ + if not insights: + return streams + insights_custom_streams = list() - for insight_entry in insights.get('insights'): - args['name'] = insight_entry.get('name') - args['fields'] = insight_entry.get('fields') - args['breakdowns'] = insight_entry.get('breakdowns') - args['action_breakdowns'] = insight_entry.get('action_breakdowns') + + for insight in insights: + args['name'] = insight.name + args['fields'] = insight.fields + args['breakdowns'] = insight.breakdowns + args['action_breakdowns'] = insight.action_breakdowns insight_stream = AdsInsights(**args) insights_custom_streams.append(insight_stream) From 2338a0ffe23cd87b3bffdede15a9772840364f5a Mon Sep 17 00:00:00 2001 From: vladimir Date: Mon, 4 Oct 2021 17:01:33 +0200 Subject: [PATCH 4/8] fix: format to source file and streams file --- .../source_facebook_marketing/source.py | 41 ++++++++----------- .../source_facebook_marketing/streams.py | 8 ++-- 2 files changed, 21 insertions(+), 28 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 0b6e0998ab3e..7add62ae6224 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -1,15 +1,15 @@ # # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # + import json from datetime import datetime -from typing import Any, List, Mapping, Tuple, Type, Optional -from airbyte_cdk.entrypoint import logger +from typing import Any, List, Mapping, Optional, Tuple, Type +from airbyte_cdk.entrypoint import logger +from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification - from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.sources.streams import Stream from pydantic import BaseModel, Field from source_facebook_marketing.api import API @@ -24,27 +24,19 @@ AdsInsightsDma, AdsInsightsPlatformAndDevice, AdsInsightsRegion, - Campaigns + Campaigns, ) class InsightConfig(BaseModel): - name: str = Field( - description='The name value of insight' - ) + name: str = Field(description="The name value of insight") - fields: Optional[List[str]] = Field( - description='A list of chosen fields for fields parameter' - ) + fields: Optional[List[str]] = Field(description="A list of chosen fields for fields parameter") - breakdowns: Optional[List[str]] = Field( - description='A list of chosen breakdowns for breakdowns' - ) + breakdowns: Optional[List[str]] = Field(description="A list of chosen breakdowns for breakdowns") - action_breakdowns: Optional[List[str]] = Field( - description='A list of chosen action_breakdowns for action_breakdowns' - ) + action_breakdowns: Optional[List[str]] = Field(description="A list of chosen action_breakdowns for action_breakdowns") class ConnectorConfig(BaseModel): @@ -81,7 +73,9 @@ class Config: ) insights: Optional[List[InsightConfig]] = Field( description="A defined list wich contains insights entries, each entry must have a name and can contain these entries(fields, breakdowns or action_breakdowns)", - examples=["[{\"name\": \"AdsInsights\",\"fields\": [\"account_id\",\"account_name\",\"ad_id\",\"ad_name\",\"adset_id\",\"adset_name\",\"campaign_id\",\"campaign_name\",\"date_start\",\"impressions\",\"spend\"],\"breakdowns\": [],\"action_breakdowns\": []}]"] + examples=[ + '[{"name": "AdsInsights","fields": ["account_id","account_name","ad_id","ad_name","adset_id","adset_name","campaign_id","campaign_name","date_start","impressions","spend"],"breakdowns": [],"action_breakdowns": []}]' + ], ) @@ -125,7 +119,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: AdSets(api=api, start_date=config.start_date, include_deleted=config.include_deleted), Ads(api=api, start_date=config.start_date, include_deleted=config.include_deleted), AdCreatives(api=api), - AdsInsights(**insights_args), AdsInsightsAgeAndGender(**insights_args), AdsInsightsCountry(**insights_args), @@ -157,7 +150,7 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: ) def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream]]: - """ Update method, if insights have values returns streams replacing the + """Update method, if insights have values returns streams replacing the default insights streams else returns streams """ @@ -167,10 +160,10 @@ def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream] insights_custom_streams = list() for insight in insights: - args['name'] = insight.name - args['fields'] = insight.fields - args['breakdowns'] = insight.breakdowns - args['action_breakdowns'] = insight.action_breakdowns + args["name"] = insight.name + args["fields"] = insight.fields + args["breakdowns"] = insight.breakdowns + args["action_breakdowns"] = insight.action_breakdowns insight_stream = AdsInsights(**args) insights_custom_streams.append(insight_stream) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index bde2d00408ea..615dcedb6064 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -9,10 +9,9 @@ from datetime import datetime from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence +import airbyte_cdk.sources.utils.casing as casing import backoff import pendulum -import airbyte_cdk.sources.utils.casing as casing - from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.core import package_name_from_class @@ -300,7 +299,8 @@ def __init__( fields: List[str] = None, breakdowns: List[str] = None, action_breakdowns: List[str] = None, - **kwargs): + **kwargs, + ): super().__init__(**kwargs) self.lookback_window = pendulum.duration(days=buffer_days) @@ -412,7 +412,7 @@ def get_json_schema(self) -> Mapping[str, Any]: """ schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") if self._fields: - schema["properties"] = {k:v for k,v in schema["properties"].items() if k in self._fields} + schema["properties"] = {k: v for k, v in schema["properties"].items() if k in self._fields} schema["properties"].update(self._schema_for_breakdowns()) return schema From 528cd44e6c6e1099154ae437647118c777e56290 Mon Sep 17 00:00:00 2001 From: vladimir Date: Tue, 5 Oct 2021 17:39:16 +0200 Subject: [PATCH 5/8] update Changelog --- docs/integrations/sources/facebook-marketing.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index e8d02c1ded34..c8dc84d7673f 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -89,6 +89,7 @@ See Facebook's [documentation on rate limiting](https://developers.facebook.com/ | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.2.21 | 2021-10-05 | [4864](https://github.com/airbytehq/airbyte/pull/4864) | Update insights streams with custom entries for fields, breakdowns and action_breakdowns | | 0.2.20 | 2021-10-04 | [6719](https://github.com/airbytehq/airbyte/pull/6719) | Update version of facebook_bussiness package to 12.0 | | 0.2.19 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification | | 0.2.18 | 2021-09-28 | [6499](https://github.com/airbytehq/airbyte/pull/6499) | Fix field values converting fail | From b47be69e7f7b0f03c78701f279b0e11bfe642668 Mon Sep 17 00:00:00 2001 From: vladimir Date: Thu, 7 Oct 2021 12:59:28 +0200 Subject: [PATCH 6/8] update: add to check a validation to insights entries, update documentation and fix to resolve in spec schema --- .../schemas/ads_insights_breakdowns.json | 38 ++++++++ .../source_facebook_marketing/source.py | 91 +++++++++++++++---- .../sources/facebook-marketing.md | 6 ++ 3 files changed, 115 insertions(+), 20 deletions(-) create mode 100644 airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights_breakdowns.json diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights_breakdowns.json b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights_breakdowns.json new file mode 100644 index 000000000000..aa54929b7ecb --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/schemas/ads_insights_breakdowns.json @@ -0,0 +1,38 @@ +{ + "properties": { + "action_device": {"type": ["null", "string"]}, + "action_canvas_component_name": {"type": ["null", "string"]}, + "action_carousel_card_id": {"type": ["null", "string"]}, + "action_carousel_card_name": {"type": ["null", "string"]}, + "action_destination": {"type": ["null", "string"]}, + "action_reaction": {"type": ["null", "string"]}, + "action_target_id": {"type": ["null", "string"]}, + "action_type": {"type": ["null", "string"]}, + "action_video_sound": {"type": ["null", "string"]}, + "action_video_type": {"type": ["null", "string"]}, + "ad_format_asset": {"type": ["null", "string"]}, + "age": {"type": ["null", "string"]}, + "app_id": {"type": ["null", "string"]}, + "body_asset": {"type": ["null", "string"]}, + "call_to_action_asset": {"type": ["null", "string"]}, + "country": {"type": ["null", "string"]}, + "description_asset": {"type": ["null", "string"]}, + "device_platform": {"type": ["null", "string"]}, + "dma": {"type": ["null", "string"]}, + "frequency_value": {"type": ["null", "string"]}, + "gender": {"type": ["null", "string"]}, + "hourly_stats_aggregated_by_advertiser_time_zone": {"type": ["null", "string"]}, + "hourly_stats_aggregated_by_audience_time_zone": {"type": ["null", "string"]}, + "image_asset": {"type": ["null", "string"]}, + "impression_device": {"type": ["null", "string"]}, + "link_url_asset": {"type": ["null", "string"]}, + "place_page_id": {"type": ["null", "string"]}, + "platform_position": {"type": ["null", "string"]}, + "product_id": {"type": ["null", "string"]}, + "publisher_platform": {"type": ["null", "string"]}, + "region": {"type": ["null", "string"]}, + "skan_conversion_id": {"type": ["null", "string"]}, + "title_asset": {"type": ["null", "string"]}, + "video_asset": {"type": ["null", "string"]} + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 7add62ae6224..155ee47aef7c 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -4,13 +4,16 @@ import json from datetime import datetime -from typing import Any, List, Mapping, Optional, Tuple, Type +from typing import Any, List, Mapping, Optional, Tuple, Type, MutableMapping +from jsonschema import RefResolver from airbyte_cdk.entrypoint import logger from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification +from airbyte_cdk.models import AirbyteConnectionStatus, AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification, Status from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.core import package_name_from_class +from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader from pydantic import BaseModel, Field from source_facebook_marketing.api import API from source_facebook_marketing.streams import ( @@ -32,11 +35,11 @@ class InsightConfig(BaseModel): name: str = Field(description="The name value of insight") - fields: Optional[List[str]] = Field(description="A list of chosen fields for fields parameter") + fields: Optional[List[str]] = Field(description="A list of chosen fields for fields parameter", default=[]) - breakdowns: Optional[List[str]] = Field(description="A list of chosen breakdowns for breakdowns") + breakdowns: Optional[List[str]] = Field(description="A list of chosen breakdowns for breakdowns", default=[]) - action_breakdowns: Optional[List[str]] = Field(description="A list of chosen action_breakdowns for action_breakdowns") + action_breakdowns: Optional[List[str]] = Field(description="A list of chosen action_breakdowns for action_breakdowns", default=[]) class ConnectorConfig(BaseModel): @@ -72,10 +75,7 @@ class Config: maximum=30, ) insights: Optional[List[InsightConfig]] = Field( - description="A defined list wich contains insights entries, each entry must have a name and can contain these entries(fields, breakdowns or action_breakdowns)", - examples=[ - '[{"name": "AdsInsights","fields": ["account_id","account_name","ad_id","ad_name","adset_id","adset_name","campaign_id","campaign_name","date_start","impressions","spend"],"breakdowns": [],"action_breakdowns": []}]' - ], + description="A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)" ) @@ -130,6 +130,20 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: return self._update_insights_streams(insights=config.insights, args=insights_args, streams=streams) + def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" + try: + check_succeeded, error = self.check_connection(logger, config) + if not check_succeeded: + return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) + except Exception as e: + return AirbyteConnectionStatus(status=Status.FAILED, message=repr(e)) + + self._check_insights_entries(config.get('insights', [])) + + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + def spec(self, *args, **kwargs) -> ConnectorSpecification: """ Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) @@ -140,12 +154,12 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: changelogUrl="https://docs.airbyte.io/integrations/sources/facebook-marketing", supportsIncremental=True, supported_destination_sync_modes=[DestinationSyncMode.append], - connectionSpecification=ConnectorConfig.schema(), + connectionSpecification=expand_local_ref(ConnectorConfig.schema()), authSpecification=AuthSpecification( auth_type="oauth2.0", oauth2Specification=OAuth2Specification( rootObject=[], oauthFlowInitParameters=[], oauthFlowOutputParameters=[["access_token"]] - ), + ) ), ) @@ -160,16 +174,53 @@ def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream] insights_custom_streams = list() for insight in insights: - args["name"] = insight.name - args["fields"] = insight.fields - args["breakdowns"] = insight.breakdowns - args["action_breakdowns"] = insight.action_breakdowns + args["name"] = f"Custom{insight.name}" + args["fields"] = list(set(insight.fields)) + args["breakdowns"] = list(set(insight.breakdowns)) + args["action_breakdowns"] = list(set(insight.action_breakdowns)) insight_stream = AdsInsights(**args) insights_custom_streams.append(insight_stream) - new_streams = list() - for stream in streams: - if stream.name not in [e.name for e in insights_custom_streams]: - new_streams.append(stream) + return streams + insights_custom_streams + + def _check_insights_entries(self, insights): - return new_streams + insights_custom_streams + default_fields = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights").get("properties", {}).keys()) + default_breakdowns = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights_breakdowns").get("properties", {}).keys()) + default_actions_breakdowns = [e for e in default_breakdowns if 'action_' in e] + + for insight in insights: + if insight.get('fields') and not self._check_values(default_fields, insight.get('fields')): + message = f"For {insight.get('name')} that field are not espected in fields" + raise Exception("Config validation error: " + message) from None + if insight.get('breakdowns') and not self._check_values(default_breakdowns, insight.get('breakdowns')): + message = f"For {insight.get('name')} that breakdown are not espected in breakdowns" + raise Exception("Config validation error: " + message) from None + if insight.get('action_breakdowns') and not self._check_values(default_actions_breakdowns, insight.get('action_breakdowns')): + message = f"For {insight.get('name')} that action_breakdowns are not espected in action_breakdowns" + raise Exception("Config validation error: " + message) from None + + return True + + def _check_values(self, default_value: List[str], custom_value: List[str]) -> bool: + for e in custom_value: + if e not in default_value: + logger.error(f"{e} does not appers in {default_value}") + return False + return True + + +def expand_local_ref(schema, resolver=None, **kwargs): + resolver = resolver or RefResolver("", schema) + if isinstance(schema, MutableMapping): + if "$ref" in schema: + ref_url = schema.pop("$ref") + url, resolved_schema = resolver.resolve(ref_url) + schema.update(resolved_schema) + for key, value in schema.items(): + schema[key] = expand_local_ref(value, resolver=resolver) + return schema + elif isinstance(schema, List): + return [expand_local_ref(item, resolver=resolver) for item in schema] + + return schema diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index c8dc84d7673f..a6a15d312bd6 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -76,6 +76,12 @@ Facebook heavily throttles API tokens generated from Facebook Apps by default, m See Facebook's [documentation on rate limiting](https://developers.facebook.com/docs/marketing-api/overview/authorization/#access-levels) for more information on requesting a quota upgrade. +## Custom Insights +In order to retrieve specific fields from Facebook Ads Insights combined with other breakdowns, there is a mechanism to allow you to choose which fields and breakdowns to sync. +It is highly recommended to follow the [documenation](https://developers.facebook.com/docs/marketing-api/insights/breakdowns), as there are limitations related to breakdowns. Some fields can not be requested and many others just work combined with specific fields, for example, the breakdown **app_id** is only supported with the **total_postbacks** field. +By now, the only check done when setting up a source is to check if the fields, breakdowns and action breakdowns are within the ones provided by Facebook. This is, if you enter a good input, it's gonna be validated, but after, if the calls to Facebook API with those pareameters fails you will receive an error from the API. +As a summary, custom insights allows to replicate only some fields, resulting in sync speed increase. + #### Data type mapping | Integration Type | Airbyte Type | Notes | From 20df1e0c0724b1fc1b7b7c0b5e53bd13dfcc4e4c Mon Sep 17 00:00:00 2001 From: vladimir Date: Thu, 14 Oct 2021 11:48:42 +0200 Subject: [PATCH 7/8] fix: fix import logger from entrypoint, change to python logger --- .../source_facebook_marketing/api.py | 4 +++- .../source_facebook_marketing/common.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py index 4cdb3ebf8011..45e9db0501e9 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py @@ -6,7 +6,7 @@ from time import sleep import pendulum -from airbyte_cdk.entrypoint import logger +import logging from cached_property import cached_property from facebook_business import FacebookAdsApi from facebook_business.adobjects import user as fb_user @@ -14,6 +14,8 @@ from facebook_business.exceptions import FacebookRequestError from source_facebook_marketing.common import FacebookAPIException +logger = logging.getLogger(__name__) + class MyFacebookAdsApi(FacebookAdsApi): """Custom Facebook API class to intercept all API calls and handle call rate limits""" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py index 9570ae30701b..f0a58bdb9a88 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py @@ -7,7 +7,7 @@ import backoff import pendulum -from airbyte_cdk.entrypoint import logger # FIXME (Eugene K): register logger as standard python logger +import logging from facebook_business.exceptions import FacebookRequestError # The Facebook API error codes indicating rate-limiting are listed at @@ -16,6 +16,8 @@ FACEBOOK_UNKNOWN_ERROR_CODE = 99 DEFAULT_SLEEP_INTERVAL = pendulum.duration(minutes=1) +logger = logging.getLogger(__name__) + class FacebookAPIException(Exception): """General class for all API errors""" From dd0033857a8900432b44bc2114c84536d9fb85fd Mon Sep 17 00:00:00 2001 From: vladimir Date: Thu, 14 Oct 2021 11:51:08 +0200 Subject: [PATCH 8/8] fix: change error message from check custom insights entries, fix logger import --- .../source_facebook_marketing/source.py | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 3a1d89a5fa98..cfef9c5c1187 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -2,12 +2,12 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # -import json +import logging from datetime import datetime from typing import Any, List, Mapping, Optional, Tuple, Type, MutableMapping from jsonschema import RefResolver -from airbyte_cdk.entrypoint import logger + from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import AirbyteConnectionStatus, AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification, Status @@ -35,6 +35,8 @@ ) +logger = logging.getLogger(__name__) + class InsightConfig(BaseModel): name: str = Field(description="The name value of insight") @@ -85,7 +87,7 @@ class Config: minimum=1, maximum=30, ) - insights: Optional[List[InsightConfig]] = Field( + custom_insights: Optional[List[InsightConfig]] = Field( description="A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)" ) @@ -142,7 +144,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: AdsInsightsActionType(**insights_args), ] - return self._update_insights_streams(insights=config.insights, args=insights_args, streams=streams) + return self._update_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams) def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: """Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" @@ -153,7 +155,7 @@ def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConn except Exception as e: return AirbyteConnectionStatus(status=Status.FAILED, message=repr(e)) - self._check_insights_entries(config.get('insights', [])) + self._check_custom_insights_entries(config.get('custom_insights', [])) return AirbyteConnectionStatus(status=Status.SUCCEEDED) @@ -197,31 +199,38 @@ def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream] return streams + insights_custom_streams - def _check_insights_entries(self, insights): + def _check_custom_insights_entries(self, insights: List[Mapping[str, Any]]): default_fields = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights").get("properties", {}).keys()) default_breakdowns = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights_breakdowns").get("properties", {}).keys()) default_actions_breakdowns = [e for e in default_breakdowns if 'action_' in e] for insight in insights: - if insight.get('fields') and not self._check_values(default_fields, insight.get('fields')): - message = f"For {insight.get('name')} that field are not espected in fields" - raise Exception("Config validation error: " + message) from None - if insight.get('breakdowns') and not self._check_values(default_breakdowns, insight.get('breakdowns')): - message = f"For {insight.get('name')} that breakdown are not espected in breakdowns" - raise Exception("Config validation error: " + message) from None - if insight.get('action_breakdowns') and not self._check_values(default_actions_breakdowns, insight.get('action_breakdowns')): - message = f"For {insight.get('name')} that action_breakdowns are not espected in action_breakdowns" - raise Exception("Config validation error: " + message) from None + if insight.get('fields'): + value_checked, value = self._check_values(default_fields, insight.get('fields')) + if not value_checked: + message = f"{value} is not a valid field name" + raise Exception("Config validation error: " + message) from None + if insight.get('breakdowns'): + value_checked, value = self._check_values(default_breakdowns, insight.get('breakdowns')) + if not value_checked: + message = f"{value} is not a valid breakdown name" + raise Exception("Config validation error: " + message) from None + if insight.get('action_breakdowns'): + value_checked, value = self._check_values(default_actions_breakdowns, insight.get('action_breakdowns')) + if not value_checked: + message = f"{value} is not a valid action_breakdown name" + raise Exception("Config validation error: " + message) from None return True - def _check_values(self, default_value: List[str], custom_value: List[str]) -> bool: + def _check_values(self, default_value: List[str], custom_value: List[str]) -> Tuple[bool, Any]: for e in custom_value: if e not in default_value: - logger.error(f"{e} does not appers in {default_value}") - return False - return True + logger.error(f"{e} does not appear in {default_value}") + return False, e + + return True, None def expand_local_ref(schema, resolver=None, **kwargs):