From 501d07f40229451aebcc65be8522aa7edabea7b4 Mon Sep 17 00:00:00 2001 From: Dmytro Date: Wed, 29 Sep 2021 09:32:19 +0300 Subject: [PATCH] Facebook marketing source: fix field value type conversion --- .pre-commit-config.yaml | 2 +- .../e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json | 2 +- .../resources/seed/source_definitions.yaml | 2 +- .../source-facebook-marketing/Dockerfile | 2 +- .../source-facebook-marketing/setup.py | 2 +- .../source_facebook_marketing/streams.py | 64 ++----------------- .../sources/facebook-marketing.md | 1 + 7 files changed, 11 insertions(+), 64 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ce71aaf602a3..73f39c22fc5f 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,7 @@ repos: rev: v0.8.8 hooks: - id: licenseheaders - args: ["--tmpl=LICENSE", "--ext=py", "-f"] + args: ["--tmpl=LICENSE_SHORT", "--ext=py", "-f"] - repo: https://github.com/ambv/black rev: 20.8b1 diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json index 3e649449fa89..4fe24257139e 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c", "name": "Facebook Marketing", "dockerRepository": "airbyte/source-facebook-marketing", - "dockerImageTag": "0.2.17", + "dockerImageTag": "0.2.18", "documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing", "icon": "facebook.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 297537ca7033..7a16ebb8bfb8 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -147,7 +147,7 @@ - sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c name: Facebook Marketing dockerRepository: airbyte/source-facebook-marketing - dockerImageTag: 0.2.17 + dockerImageTag: 0.2.18 documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing icon: facebook.svg sourceType: api diff --git a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile index 2edff54bae84..f108786a5a87 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.17 +LABEL io.airbyte.version=0.2.18 LABEL io.airbyte.name=airbyte/source-facebook-marketing diff --git a/airbyte-integrations/connectors/source-facebook-marketing/setup.py b/airbyte-integrations/connectors/source-facebook-marketing/setup.py index f642310b8230..829d1259323a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/setup.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/setup.py @@ -6,7 +6,7 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1", + "airbyte-cdk~=0.1.24", "cached_property~=1.5", "facebook_business~=11.0", "pendulum>=2,<3", diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py index eb2521c7f676..55dfd62a57f5 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py @@ -7,7 +7,7 @@ from abc import ABC from collections import deque from datetime import datetime -from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union +from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence import backoff import pendulum @@ -15,6 +15,7 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.core import package_name_from_class from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from cached_property import cached_property from facebook_business.adobjects.adreportrun import AdReportRun from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse @@ -45,6 +46,7 @@ class FBMarketingStream(Stream, ABC): """Base stream class""" primary_key = "id" + transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) page_size = 100 @@ -90,63 +92,7 @@ def read_records( ) -> Iterable[Mapping[str, Any]]: """Main read method used by CDK""" for record in self._read_records(params=self.request_params(stream_state=stream_state)): - yield self.transform(self._extend_record(record, fields=self.fields)) - - def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]: - """ - Use this method to remove update fields types in record according to schema. - """ - schema = self.get_json_schema() - self.convert_to_schema_types(record, schema["properties"]) - return record - - def get_python_type(self, _types: Union[list, str]) -> tuple: - """Converts types from schema to python types. Examples: - - `["string", "null"]` will be converted to `(str,)` - - `["array", "string", "null"]` will be converted to `(list, str,)` - - `"boolean"` will be converted to `(bool,)` - """ - types_mapping = { - "string": str, - "number": float, - "integer": int, - "object": dict, - "array": list, - "boolean": bool, - } - - if isinstance(_types, list): - return tuple([types_mapping[t] for t in _types if t != "null"]) - - return (types_mapping[_types],) - - def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]): - """ - Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value - and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string. - This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all - types from schema. - """ - if not schema: - return - - for key, value in record.items(): - if key not in schema: - continue - - if isinstance(value, dict): - self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {})) - elif isinstance(value, list) and "items" in schema[key]: - for record_list_item in value: - if list in self.get_python_type(schema[key]["items"]["type"]): - # TODO Currently we don't have support for list of lists. - pass - elif dict in self.get_python_type(schema[key]["items"]["type"]): - self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"]) - elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])): - record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item) - elif not isinstance(value, self.get_python_type(schema[key]["type"])): - record[key] = self.get_python_type(schema[key]["type"])[0](value) + yield self._extend_record(record, fields=self.fields) def _read_records(self, params: Mapping[str, Any]) -> Iterable: """Wrapper around query to backoff errors. @@ -361,7 +307,7 @@ def read_records( # because we query `lookback_window` days before actual cursor we might get records older then cursor for obj in result.get_result(): - yield self.transform(obj.export_all_data()) + yield obj.export_all_data() def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time. diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index be9188b2227a..967dd7848083 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -101,6 +101,7 @@ With the Ad Account ID and API access token, you should be ready to start pullin | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.2.18 | 2021-09-28 | [6499](https://github.com/airbytehq/airbyte/pull/6499) | Fix field values converting fail | | 0.2.17 | 2021-09-14 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types | | 0.2.16 | 2021-09-14 | [6060](https://github.com/airbytehq/airbyte/pull/6060) | Fix schema for `ads_insights` stream | | 0.2.15 | 2021-09-14 | [5958](https://github.com/airbytehq/airbyte/pull/5958) | Fix url parsing and add report that exposes conversions |