Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Connector Facebook-Marketing: update insights streams with custom entries for fields, breakdowns and action_breakdowns #7158

Merged
merged 15 commits into from
Oct 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.20",
"dockerImageTag": "0.2.21",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing",
"icon": "facebook.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.20
dockerImageTag: 0.2.21
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.20
LABEL io.airbyte.version=0.2.21
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,94 @@
"minimum": 1,
"maximum": 30,
"type": "integer"
},
"custom_insights": {
"title": "Custom Insights",
"description": "A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)",
"type": "array",
"items": {
"title": "InsightConfig",
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "The name value of insight",
"type": "string"
},
"fields": {
"title": "Fields",
"description": "A list of chosen fields for fields parameter",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"breakdowns": {
"title": "Breakdowns",
"description": "A list of chosen breakdowns for breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"action_breakdowns": {
"title": "Action Breakdowns",
"description": "A list of chosen action_breakdowns for action_breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["name"]
}
}
},
"required": ["account_id", "access_token", "start_date"]
"required": ["account_id", "access_token", "start_date"],
"definitions": {
"InsightConfig": {
"title": "InsightConfig",
"type": "object",
"properties": {
"name": {
"title": "Name",
"description": "The name value of insight",
"type": "string"
},
"fields": {
"title": "Fields",
"description": "A list of chosen fields for fields parameter",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"breakdowns": {
"title": "Breakdowns",
"description": "A list of chosen breakdowns for breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
},
"action_breakdowns": {
"title": "Action Breakdowns",
"description": "A list of chosen action_breakdowns for action_breakdowns",
"default": [],
"type": "array",
"items": {
"type": "string"
}
}
},
"required": ["name"]
}
}
},
"supportsIncremental": true,
"supported_destination_sync_modes": ["append"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@
from time import sleep

import pendulum
from airbyte_cdk.entrypoint import logger
import logging
from cached_property import cached_property
from facebook_business import FacebookAdsApi
from facebook_business.adobjects import user as fb_user
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.exceptions import FacebookRequestError
from source_facebook_marketing.common import FacebookAPIException

logger = logging.getLogger(__name__)


class MyFacebookAdsApi(FacebookAdsApi):
"""Custom Facebook API class to intercept all API calls and handle call rate limits"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import backoff
import pendulum
from airbyte_cdk.entrypoint import logger # FIXME (Eugene K): register logger as standard python logger
import logging
from facebook_business.exceptions import FacebookRequestError

# The Facebook API error codes indicating rate-limiting are listed at
Expand All @@ -16,6 +16,8 @@
FACEBOOK_UNKNOWN_ERROR_CODE = 99
DEFAULT_SLEEP_INTERVAL = pendulum.duration(minutes=1)

logger = logging.getLogger(__name__)


class FacebookAPIException(Exception):
"""General class for all API errors"""
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"properties": {
"action_device": {"type": ["null", "string"]},
"action_canvas_component_name": {"type": ["null", "string"]},
"action_carousel_card_id": {"type": ["null", "string"]},
"action_carousel_card_name": {"type": ["null", "string"]},
"action_destination": {"type": ["null", "string"]},
"action_reaction": {"type": ["null", "string"]},
"action_target_id": {"type": ["null", "string"]},
"action_type": {"type": ["null", "string"]},
"action_video_sound": {"type": ["null", "string"]},
"action_video_type": {"type": ["null", "string"]},
"ad_format_asset": {"type": ["null", "string"]},
"age": {"type": ["null", "string"]},
"app_id": {"type": ["null", "string"]},
"body_asset": {"type": ["null", "string"]},
"call_to_action_asset": {"type": ["null", "string"]},
"country": {"type": ["null", "string"]},
"description_asset": {"type": ["null", "string"]},
"device_platform": {"type": ["null", "string"]},
"dma": {"type": ["null", "string"]},
"frequency_value": {"type": ["null", "string"]},
"gender": {"type": ["null", "string"]},
"hourly_stats_aggregated_by_advertiser_time_zone": {"type": ["null", "string"]},
"hourly_stats_aggregated_by_audience_time_zone": {"type": ["null", "string"]},
"image_asset": {"type": ["null", "string"]},
"impression_device": {"type": ["null", "string"]},
"link_url_asset": {"type": ["null", "string"]},
"place_page_id": {"type": ["null", "string"]},
"platform_position": {"type": ["null", "string"]},
"product_id": {"type": ["null", "string"]},
"publisher_platform": {"type": ["null", "string"]},
"region": {"type": ["null", "string"]},
"skan_conversion_id": {"type": ["null", "string"]},
"title_asset": {"type": ["null", "string"]},
"video_asset": {"type": ["null", "string"]}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging
from datetime import datetime
from typing import Any, List, Mapping, Optional, Tuple, Type, MutableMapping
from jsonschema import RefResolver


from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteConnectionStatus, AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification, Status

from typing import Any, List, Mapping, Optional, Tuple, Type

import pendulum
from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from pydantic import BaseModel, Field
from source_facebook_marketing.api import API
from source_facebook_marketing.streams import (
Expand All @@ -26,6 +35,19 @@
)


logger = logging.getLogger(__name__)

class InsightConfig(BaseModel):

name: str = Field(description="The name value of insight")

fields: Optional[List[str]] = Field(description="A list of chosen fields for fields parameter", default=[])

breakdowns: Optional[List[str]] = Field(description="A list of chosen breakdowns for breakdowns", default=[])

action_breakdowns: Optional[List[str]] = Field(description="A list of chosen action_breakdowns for action_breakdowns", default=[])


class ConnectorConfig(BaseModel):
class Config:
title = "Source Facebook Marketing"
Expand Down Expand Up @@ -65,6 +87,9 @@ class Config:
minimum=1,
maximum=30,
)
custom_insights: Optional[List[InsightConfig]] = Field(
description="A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)"
)


class SourceFacebookMarketing(AbstractSource):
Expand Down Expand Up @@ -104,10 +129,11 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
days_per_job=config.insights_days_per_job,
)

return [
streams = [
Campaigns(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
AdSets(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Ads(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),

AdCreatives(api=api),
AdsInsights(**insights_args),
AdsInsightsAgeAndGender(**insights_args),
Expand All @@ -118,6 +144,22 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
AdsInsightsActionType(**insights_args),
]

return self._update_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams)

def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus:
"""Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification."""
try:
check_succeeded, error = self.check_connection(logger, config)
if not check_succeeded:
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error))
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=repr(e))

self._check_custom_insights_entries(config.get('custom_insights', []))

return AirbyteConnectionStatus(status=Status.SUCCEEDED)


def spec(self, *args, **kwargs) -> ConnectorSpecification:
"""
Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password)
Expand All @@ -128,11 +170,80 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification:
changelogUrl="https://docs.airbyte.io/integrations/sources/facebook-marketing",
supportsIncremental=True,
supported_destination_sync_modes=[DestinationSyncMode.append],
connectionSpecification=ConnectorConfig.schema(),
connectionSpecification=expand_local_ref(ConnectorConfig.schema()),
authSpecification=AuthSpecification(
auth_type="oauth2.0",
oauth2Specification=OAuth2Specification(
rootObject=[], oauthFlowInitParameters=[], oauthFlowOutputParameters=[["access_token"]]
),
)
),
)

def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream]]:
"""Update method, if insights have values returns streams replacing the
default insights streams else returns streams

"""
if not insights:
return streams

insights_custom_streams = list()

for insight in insights:
args["name"] = f"Custom{insight.name}"
args["fields"] = list(set(insight.fields))
args["breakdowns"] = list(set(insight.breakdowns))
args["action_breakdowns"] = list(set(insight.action_breakdowns))
insight_stream = AdsInsights(**args)
insights_custom_streams.append(insight_stream)

return streams + insights_custom_streams

def _check_custom_insights_entries(self, insights: List[Mapping[str, Any]]):

default_fields = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights").get("properties", {}).keys())
default_breakdowns = list(ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights_breakdowns").get("properties", {}).keys())
default_actions_breakdowns = [e for e in default_breakdowns if 'action_' in e]

for insight in insights:
if insight.get('fields'):
value_checked, value = self._check_values(default_fields, insight.get('fields'))
if not value_checked:
message = f"{value} is not a valid field name"
raise Exception("Config validation error: " + message) from None
if insight.get('breakdowns'):
value_checked, value = self._check_values(default_breakdowns, insight.get('breakdowns'))
if not value_checked:
message = f"{value} is not a valid breakdown name"
raise Exception("Config validation error: " + message) from None
if insight.get('action_breakdowns'):
value_checked, value = self._check_values(default_actions_breakdowns, insight.get('action_breakdowns'))
if not value_checked:
message = f"{value} is not a valid action_breakdown name"
raise Exception("Config validation error: " + message) from None

return True

def _check_values(self, default_value: List[str], custom_value: List[str]) -> Tuple[bool, Any]:
for e in custom_value:
if e not in default_value:
logger.error(f"{e} does not appear in {default_value}")
return False, e

return True, None


def expand_local_ref(schema, resolver=None, **kwargs):
resolver = resolver or RefResolver("", schema)
if isinstance(schema, MutableMapping):
if "$ref" in schema:
ref_url = schema.pop("$ref")
url, resolved_schema = resolver.resolve(ref_url)
schema.update(resolved_schema)
for key, value in schema.items():
schema[key] = expand_local_ref(value, resolver=resolver)
return schema
elif isinstance(schema, List):
return [expand_local_ref(item, resolver=resolver) for item in schema]

return schema
Loading