Skip to content

Commit

Permalink
🎉Source Facebook Marketing: use AdAccount('act_{id}') to find account…
Browse files Browse the repository at this point in the history
… instead of iterating (#11751)
  • Loading branch information
mdibaiee authored Apr 15, 2022
1 parent 992d229 commit e78ed58
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.43
dockerImageTag: 0.2.44
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1716,7 +1716,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.43"
- dockerImage: "airbyte/source-facebook-marketing:0.2.44"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.43

LABEL io.airbyte.version=0.2.44
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import pendulum
from cached_property import cached_property
from facebook_business import FacebookAdsApi
from facebook_business.adobjects import user as fb_user
from facebook_business.adobjects.adaccount import AdAccount
from facebook_business.api import FacebookResponse
from facebook_business.exceptions import FacebookRequestError
Expand Down Expand Up @@ -174,11 +173,6 @@ def account(self) -> AdAccount:
def _find_account(account_id: str) -> AdAccount:
"""Actual implementation of find account"""
try:
accounts = fb_user.User(fbid="me").get_ad_accounts()
for account in accounts:
if account["account_id"] == account_id:
return account
return AdAccount(f"act_{account_id}").api_get()
except FacebookRequestError as exc:
raise FacebookAPIException(f"Error: {exc.api_error_code()}, {exc.api_error_message()}") from exc

raise FacebookAPIException("Couldn't find account with id {}".format(account_id))
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Any, List, Mapping, Tuple, Type

import pendulum
import requests
from airbyte_cdk.models import AuthSpecification, ConnectorSpecification, DestinationSyncMode, OAuth2Specification
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
Expand Down Expand Up @@ -43,10 +44,12 @@ def check_connection(self, _logger: "logging.Logger", config: Mapping[str, Any])
config = ConnectorConfig.parse_obj(config)
if pendulum.instance(config.end_date) < pendulum.instance(config.start_date):
raise ValueError("end_date must be equal or after start_date.")
api = API(account_id=config.account_id, access_token=config.access_token)
logger.info(f"Select account {api.account}")

return True, None
try:
api = API(account_id=config.account_id, access_token=config.access_token)
logger.info(f"Select account {api.account}")
return True, None
except requests.exceptions.RequestException as e:
return False, e

def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
"""Discovery method, returns available streams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ def api_fixture(some_config, requests_mock, fb_account_response):
api = API(account_id=some_config["account_id"], access_token=some_config["access_token"])

requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/me/adaccounts", [fb_account_response])
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{some_config['account_id']}/", [fb_account_response])
return api
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
import pendulum
import pytest
import source_facebook_marketing
from facebook_business import FacebookAdsApi, FacebookSession
from facebook_business.adobjects.adaccount import AdAccount

FB_API_VERSION = FacebookAdsApi.API_VERSION


class TestMyFacebookAdsApi:
@pytest.fixture
def api(self):
def fb_api(self):
return source_facebook_marketing.api.MyFacebookAdsApi.init(access_token="foo", crash_log=False)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -42,12 +46,12 @@ def api(self):
],
)
def test__compute_pause_interval(
self, mocker, api, max_rate, max_pause_interval, min_pause_interval, usage, pause_interval, expected_pause_interval
self, mocker, fb_api, max_rate, max_pause_interval, min_pause_interval, usage, pause_interval, expected_pause_interval
):
mocker.patch.object(api, "MAX_RATE", max_rate)
mocker.patch.object(api, "MAX_PAUSE_INTERVAL", max_pause_interval)
mocker.patch.object(api, "MIN_PAUSE_INTERVAL", min_pause_interval)
computed_pause_interval = api._compute_pause_interval(usage, pause_interval)
mocker.patch.object(fb_api, "MAX_RATE", max_rate)
mocker.patch.object(fb_api, "MAX_PAUSE_INTERVAL", max_pause_interval)
mocker.patch.object(fb_api, "MIN_PAUSE_INTERVAL", min_pause_interval)
computed_pause_interval = fb_api._compute_pause_interval(usage, pause_interval)
assert computed_pause_interval == expected_pause_interval

@pytest.mark.parametrize(
Expand Down Expand Up @@ -81,18 +85,18 @@ def test__compute_pause_interval(
),
],
)
def test__get_max_usage_pause_interval_from_batch(self, mocker, api, min_pause_interval, usages_pause_intervals, expected_output):
def test__get_max_usage_pause_interval_from_batch(self, mocker, fb_api, min_pause_interval, usages_pause_intervals, expected_output):
records = [
{"headers": [{"name": "USAGE", "value": usage}, {"name": "PAUSE_INTERVAL", "value": pause_interval}]}
for usage, pause_interval in usages_pause_intervals
]

mock_parse_call_rate_header = mocker.Mock(side_effect=usages_pause_intervals)
mocker.patch.object(api, "_parse_call_rate_header", mock_parse_call_rate_header)
mocker.patch.object(api, "MIN_PAUSE_INTERVAL", min_pause_interval)
mocker.patch.object(fb_api, "_parse_call_rate_header", mock_parse_call_rate_header)
mocker.patch.object(fb_api, "MIN_PAUSE_INTERVAL", min_pause_interval)

output = api._get_max_usage_pause_interval_from_batch(records)
api._parse_call_rate_header.assert_called_with(
output = fb_api._get_max_usage_pause_interval_from_batch(records)
fb_api._parse_call_rate_header.assert_called_with(
{"usage": usages_pause_intervals[-1][0], "pause_interval": usages_pause_intervals[-1][1]}
)
assert output == expected_output
Expand All @@ -108,24 +112,30 @@ def test__get_max_usage_pause_interval_from_batch(self, mocker, api, min_pause_i
(["not_batch"], 2, 1, False),
],
)
def test__handle_call_rate_limit(self, mocker, api, params, min_rate, usage, expect_sleep):
def test__handle_call_rate_limit(self, mocker, fb_api, params, min_rate, usage, expect_sleep):
pause_interval = 1
mock_response = mocker.Mock()

mocker.patch.object(api, "MIN_RATE", min_rate)
mocker.patch.object(api, "_get_max_usage_pause_interval_from_batch", mocker.Mock(return_value=(usage, pause_interval)))
mocker.patch.object(api, "_parse_call_rate_header", mocker.Mock(return_value=(usage, pause_interval)))
mocker.patch.object(api, "_compute_pause_interval")
mocker.patch.object(fb_api, "MIN_RATE", min_rate)
mocker.patch.object(fb_api, "_get_max_usage_pause_interval_from_batch", mocker.Mock(return_value=(usage, pause_interval)))
mocker.patch.object(fb_api, "_parse_call_rate_header", mocker.Mock(return_value=(usage, pause_interval)))
mocker.patch.object(fb_api, "_compute_pause_interval")
mocker.patch.object(source_facebook_marketing.api, "logger")
mocker.patch.object(source_facebook_marketing.api, "sleep")
assert api._handle_call_rate_limit(mock_response, params) is None
assert fb_api._handle_call_rate_limit(mock_response, params) is None
if "batch" in params:
api._get_max_usage_pause_interval_from_batch.assert_called_with(mock_response.json.return_value)
fb_api._get_max_usage_pause_interval_from_batch.assert_called_with(mock_response.json.return_value)
else:
api._parse_call_rate_header.assert_called_with(mock_response.headers.return_value)
fb_api._parse_call_rate_header.assert_called_with(mock_response.headers.return_value)
if expect_sleep:
api._compute_pause_interval.assert_called_with(usage=usage, pause_interval=pause_interval)
source_facebook_marketing.api.sleep.assert_called_with(api._compute_pause_interval.return_value.total_seconds())
fb_api._compute_pause_interval.assert_called_with(usage=usage, pause_interval=pause_interval)
source_facebook_marketing.api.sleep.assert_called_with(fb_api._compute_pause_interval.return_value.total_seconds())
source_facebook_marketing.api.logger.warning.assert_called_with(
f"Utilization is too high ({usage})%, pausing for {api._compute_pause_interval.return_value}"
f"Utilization is too high ({usage})%, pausing for {fb_api._compute_pause_interval.return_value}"
)

def test_find_account(self, api, account_id, requests_mock):
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/", [{"json": {"id": "act_test"}}])
account = api._find_account(account_id)
assert isinstance(account, AdAccount)
assert account.get_id() == "act_test"
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def test_batch_limit_reached(self, requests_mock, api, fb_call_rate_response, ac
]

requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/adcreatives", responses)
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/", responses)
requests_mock.register_uri("POST", FacebookSession.GRAPH + f"/{FB_API_VERSION}/", batch_responses)

stream = AdCreatives(api=api, include_deleted=False)
Expand Down Expand Up @@ -124,6 +125,7 @@ def test_common_error_retry(self, error_response, requests_mock, api, account_id
]

requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/act_{account_id}/", responses)
requests_mock.register_uri("GET", FacebookSession.GRAPH + f"/{FB_API_VERSION}/{account_data['id']}/", responses)

stream = AdAccount(api=api)
accounts = list(stream.read_records(sync_mode=SyncMode.full_refresh, stream_state={}))
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ For more information, see the [Facebook Insights API documentation.](https://dev
## Changelog

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.2.44 | 2022-04-14 | [11751](https://github.com/airbytehq/airbyte/pull/11751) | Update API to a directly initialise an AdAccount with the given ID |
| 0.2.43 | 2022-04-13 | [11801](https://github.com/airbytehq/airbyte/pull/11801) | Fix `user_tos_accepted` schema to be an object
| 0.2.42 | 2022-04-06 | [11761](https://github.com/airbytehq/airbyte/pull/11761) | Upgrade Facebook Python SDK to version 13|
| 0.2.41 | 2022-03-28 | [11446](https://github.com/airbytehq/airbyte/pull/11446) | Increase number of attempts for individual jobs |
Expand Down

0 comments on commit e78ed58

Please sign in to comment.