From 88ebc9dfb2f195bfb4c2a65d63722f96c4e80324 Mon Sep 17 00:00:00 2001 From: Keith Thompson Date: Wed, 29 Jun 2022 17:53:20 +0100 Subject: [PATCH 1/4] Add max batch size config --- .../integration_tests/spec.json | 8 +++++++ .../source_facebook_marketing/source.py | 22 ++++++++++++------- .../source_facebook_marketing/spec.py | 7 ++++++ .../streams/base_streams.py | 7 +++--- .../streams/common.py | 1 - .../unit_tests/test_base_streams.py | 3 +-- 6 files changed, 34 insertions(+), 14 deletions(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json index 338bbe162a27..7196ef365f01 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json @@ -320,6 +320,14 @@ "mininum": 1, "exclusiveMinimum": 0, "type": "integer" + }, + "max_batch_size": { + "title": "Maximum size of Batched Requests", + "description": "Maximum batch size used when sending batch requests to Facebook API. Most users do not need to set this field unless they specifically need to tune the connector to address specific issues or use cases.", + "default": 50, + "order": 9, + "exclusiveMinimum": 0, + "type": "integer" } }, "required": ["account_id", "start_date", "access_token"] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index fda3f96e10ae..bc0b3a80e0b7 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -71,6 +71,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, + max_batch_size=config.max_batch_size, ), Ads( api=api, @@ -78,21 +79,23 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, + max_batch_size=config.max_batch_size ), - AdCreatives(api=api, fetch_thumbnail_images=config.fetch_thumbnail_images, page_size=config.page_size), - AdsInsights(page_size=config.page_size, **insights_args), - AdsInsightsAgeAndGender(page_size=config.page_size, **insights_args), - AdsInsightsCountry(page_size=config.page_size, **insights_args), - AdsInsightsRegion(page_size=config.page_size, **insights_args), - AdsInsightsDma(page_size=config.page_size, **insights_args), - AdsInsightsPlatformAndDevice(page_size=config.page_size, **insights_args), - AdsInsightsActionType(page_size=config.page_size, **insights_args), + AdCreatives(api=api, fetch_thumbnail_images=config.fetch_thumbnail_images, page_size=config.page_size, max_batch_size=config.max_batch_size), + AdsInsights(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), + AdsInsightsAgeAndGender(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), + AdsInsightsCountry(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), + AdsInsightsRegion(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), + AdsInsightsDma(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), + AdsInsightsPlatformAndDevice(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), + AdsInsightsActionType(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), Campaigns( api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, + max_batch_size=config.max_batch_size ), Images( api=api, @@ -100,6 +103,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, + max_batch_size=config.max_batch_size ), Videos( api=api, @@ -107,6 +111,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, + max_batch_size=config.max_batch_size ), Activities( api=api, @@ -114,6 +119,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, + max_batch_size=config.max_batch_size ), ] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py index 718247fa5e68..5b7f435dc41f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py @@ -172,3 +172,10 @@ class Config: mininum=1, default=28, ) + + max_batch_size: Optional[PositiveInt] = Field( + title="Maximum size of Batched Requests", + order=9, + description="Maximum batch size used when sending batch requests to Facebook API. Most users do not need to set this field unless they specifically need to tune the connector to address specific issues or use cases.", + default=50, + ) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py index 85b24b9397fc..6dc004631858 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -16,7 +16,7 @@ from facebook_business.adobjects.adimage import AdImage from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse -from .common import MAX_BATCH_SIZE, deep_merge +from .common import deep_merge if TYPE_CHECKING: # pragma: no cover from source_facebook_marketing.api import API @@ -37,11 +37,12 @@ class FBMarketingStream(Stream, ABC): # entity prefix for `include_deleted` filter, it usually matches singular version of stream name entity_prefix = None - def __init__(self, api: "API", include_deleted: bool = False, page_size: int = 100, **kwargs): + def __init__(self, api: "API", include_deleted: bool = False, page_size: int = 100, max_batch_size: int = 50, **kwargs): super().__init__(**kwargs) self._api = api self.page_size = page_size if page_size is not None else 100 self._include_deleted = include_deleted if self.enable_deleted else False + self.max_batch_size = max_batch_size if max_batch_size is not None else 50 @cached_property def fields(self) -> List[str]: @@ -68,7 +69,7 @@ def failure(response: FacebookResponse): api_batch: FacebookAdsApiBatch = self._api.api.new_batch() for request in pending_requests: api_batch.add_request(request, success=success, failure=failure) - if len(api_batch) == MAX_BATCH_SIZE: + if len(api_batch) == self.max_batch_size: self._execute_batch(api_batch) yield from records records = [] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py index a32e09638061..408a0b34e927 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py @@ -19,7 +19,6 @@ FACEBOOK_UNKNOWN_ERROR_CODE = 99 FACEBOOK_CONNECTION_RESET_ERROR_CODE = 104 DEFAULT_SLEEP_INTERVAL = pendulum.duration(minutes=1) -MAX_BATCH_SIZE = 50 logger = logging.getLogger("airbyte") diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py index 093f46f944c0..acd3792922b6 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_base_streams.py @@ -11,7 +11,6 @@ from facebook_business.api import FacebookAdsApi, FacebookAdsApiBatch, FacebookRequest from source_facebook_marketing.api import MyFacebookAdsApi from source_facebook_marketing.streams.base_streams import FBMarketingStream -from source_facebook_marketing.streams.common import MAX_BATCH_SIZE @pytest.fixture(name="mock_batch_responses") @@ -64,7 +63,7 @@ def test_execute_in_batch_with_many_requests(self, api, batch, mock_batch_respon ) stream = SomeTestStream(api=api) - requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(MAX_BATCH_SIZE + 1)] + requests = [FacebookRequest("node", "GET", "endpoint") for _ in range(50 + 1)] result = list(stream.execute_in_batch(requests)) From d213d26ce12a1f48c708caa04fd2f077eae420c4 Mon Sep 17 00:00:00 2001 From: Keith Thompson Date: Wed, 29 Jun 2022 18:30:26 +0100 Subject: [PATCH 2/4] Bump semver --- .../connectors/source-facebook-marketing/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 5aeb4ca834d9..3b4f866c6eaf 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.53 +LABEL io.airbyte.version=0.2.54 LABEL io.airbyte.name=airbyte/source-facebook-marketing From d5137a7c595884fc9f5a3c8a3a300c5ee6e9c832 Mon Sep 17 00:00:00 2001 From: Keith Thompson Date: Wed, 29 Jun 2022 18:36:27 +0100 Subject: [PATCH 3/4] add changelog --- .../src/main/resources/seed/source_definitions.yaml | 2 +- .../init/src/main/resources/seed/source_specs.yaml | 11 ++++++++++- docs/integrations/sources/facebook-marketing.md | 1 + 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 567cef88972f..455bf615b7e9 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -248,7 +248,7 @@ - name: Facebook Marketing sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.53 + dockerImageTag: 0.2.54 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 72e23acf5b42..8c512570de68 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1827,7 +1827,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-facebook-marketing:0.2.53" +- dockerImage: "airbyte/source-facebook-marketing:0.2.54" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing" @@ -2159,6 +2159,15 @@ mininum: 1 exclusiveMinimum: 0 type: "integer" + max_batch_size: + title: "Maximum size of Batched Requests" + description: "Maximum batch size used when sending batch requests to Facebook API.\ + \ Most users do not need to set this field unless they specifically need to tune\ + \ the connector to address specific issues or use cases." + default: 50 + order: 9 + exclusiveMinimum: 0 + type: "integer" required: - "account_id" - "start_date" diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 692f2ab282f4..3c55348b68fd 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -120,6 +120,7 @@ Please be informed that the connector uses the `lookback_window` parameter to pe | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.2.54 | 2022-06-29 | [14267](https://github.com/airbytehq/airbyte/pull/14267) | Make MAX_BATCH_SIZE available in config | | 0.2.53 | 2022-06-16 | [13623](https://github.com/airbytehq/airbyte/pull/13623) | Add fields `bid_amount` `bid_strategy` `bid_constraints` to `ads_set` stream | | 0.2.52 | 2022-06-14 | [13749](https://github.com/airbytehq/airbyte/pull/13749) | Fix the `not syncing any data` issue | | 0.2.51 | 2022-05-30 | [13317](https://github.com/airbytehq/airbyte/pull/13317) | Change tax_id to string (Canadian has letter in tax_id) | From 9ccd91cc6b2a480bceafa9b761144182d4f73ec6 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Mon, 4 Jul 2022 18:05:18 +0000 Subject: [PATCH 4/4] auto-bump connector version --- .../src/main/resources/seed/source_specs.yaml | 6 +++--- .../source_facebook_marketing/source.py | 17 +++++++++++------ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 1cb9a6663f21..03da702140ac 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2161,9 +2161,9 @@ type: "integer" max_batch_size: title: "Maximum size of Batched Requests" - description: "Maximum batch size used when sending batch requests to Facebook API.\ - \ Most users do not need to set this field unless they specifically need to tune\ - \ the connector to address specific issues or use cases." + description: "Maximum batch size used when sending batch requests to Facebook\ + \ API. Most users do not need to set this field unless they specifically\ + \ need to tune the connector to address specific issues or use cases." default: 50 order: 9 exclusiveMinimum: 0 diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index bc0b3a80e0b7..3a1f2d59568d 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -79,9 +79,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, - max_batch_size=config.max_batch_size + max_batch_size=config.max_batch_size, + ), + AdCreatives( + api=api, + fetch_thumbnail_images=config.fetch_thumbnail_images, + page_size=config.page_size, + max_batch_size=config.max_batch_size, ), - AdCreatives(api=api, fetch_thumbnail_images=config.fetch_thumbnail_images, page_size=config.page_size, max_batch_size=config.max_batch_size), AdsInsights(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), AdsInsightsAgeAndGender(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), AdsInsightsCountry(page_size=config.page_size, max_batch_size=config.max_batch_size, **insights_args), @@ -95,7 +100,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, - max_batch_size=config.max_batch_size + max_batch_size=config.max_batch_size, ), Images( api=api, @@ -103,7 +108,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, - max_batch_size=config.max_batch_size + max_batch_size=config.max_batch_size, ), Videos( api=api, @@ -111,7 +116,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, - max_batch_size=config.max_batch_size + max_batch_size=config.max_batch_size, ), Activities( api=api, @@ -119,7 +124,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: end_date=config.end_date, include_deleted=config.include_deleted, page_size=config.page_size, - max_batch_size=config.max_batch_size + max_batch_size=config.max_batch_size, ), ]