Skip to content

Commit

Permalink
Facebook marketing source: fix field value type conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro authored Sep 29, 2021
1 parent 0105ca9 commit 501d07f
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 64 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ repos:
rev: v0.8.8
hooks:
- id: licenseheaders
args: ["--tmpl=LICENSE", "--ext=py", "-f"]
args: ["--tmpl=LICENSE_SHORT", "--ext=py", "-f"]

- repo: https://github.com/ambv/black
rev: 20.8b1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
"name": "Facebook Marketing",
"dockerRepository": "airbyte/source-facebook-marketing",
"dockerImageTag": "0.2.17",
"dockerImageTag": "0.2.18",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing",
"icon": "facebook.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
- sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
name: Facebook Marketing
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.17
dockerImageTag: 0.2.18
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.17
LABEL io.airbyte.version=0.2.18
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-cdk~=0.1.24",
"cached_property~=1.5",
"facebook_business~=11.0",
"pendulum>=2,<3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
from abc import ABC
from collections import deque
from datetime import datetime
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence, Union
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence

import backoff
import pendulum
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.core import package_name_from_class
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from cached_property import cached_property
from facebook_business.adobjects.adreportrun import AdReportRun
from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse
Expand Down Expand Up @@ -45,6 +46,7 @@ class FBMarketingStream(Stream, ABC):
"""Base stream class"""

primary_key = "id"
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)

page_size = 100

Expand Down Expand Up @@ -90,63 +92,7 @@ def read_records(
) -> Iterable[Mapping[str, Any]]:
"""Main read method used by CDK"""
for record in self._read_records(params=self.request_params(stream_state=stream_state)):
yield self.transform(self._extend_record(record, fields=self.fields))

def transform(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Use this method to remove update fields types in record according to schema.
"""
schema = self.get_json_schema()
self.convert_to_schema_types(record, schema["properties"])
return record

def get_python_type(self, _types: Union[list, str]) -> tuple:
"""Converts types from schema to python types. Examples:
- `["string", "null"]` will be converted to `(str,)`
- `["array", "string", "null"]` will be converted to `(list, str,)`
- `"boolean"` will be converted to `(bool,)`
"""
types_mapping = {
"string": str,
"number": float,
"integer": int,
"object": dict,
"array": list,
"boolean": bool,
}

if isinstance(_types, list):
return tuple([types_mapping[t] for t in _types if t != "null"])

return (types_mapping[_types],)

def convert_to_schema_types(self, record: Mapping[str, Any], schema: Mapping[str, Any]):
"""
Converts values' type from record to appropriate type from schema. For example, let's say we have `reach` value
and in schema it has `number` type because it's, well, a number, but from API we are getting `reach` as string.
This function fixes this and converts `reach` value from `string` to `number`. Same for all fields and all
types from schema.
"""
if not schema:
return

for key, value in record.items():
if key not in schema:
continue

if isinstance(value, dict):
self.convert_to_schema_types(record=value, schema=schema[key].get("properties", {}))
elif isinstance(value, list) and "items" in schema[key]:
for record_list_item in value:
if list in self.get_python_type(schema[key]["items"]["type"]):
# TODO Currently we don't have support for list of lists.
pass
elif dict in self.get_python_type(schema[key]["items"]["type"]):
self.convert_to_schema_types(record=record_list_item, schema=schema[key]["items"]["properties"])
elif not isinstance(record_list_item, self.get_python_type(schema[key]["items"]["type"])):
record[key] = self.get_python_type(schema[key]["items"]["type"])[0](record_list_item)
elif not isinstance(value, self.get_python_type(schema[key]["type"])):
record[key] = self.get_python_type(schema[key]["type"])[0](value)
yield self._extend_record(record, fields=self.fields)

def _read_records(self, params: Mapping[str, Any]) -> Iterable:
"""Wrapper around query to backoff errors.
Expand Down Expand Up @@ -361,7 +307,7 @@ def read_records(
# because we query `lookback_window` days before actual cursor we might get records older then cursor

for obj in result.get_result():
yield self.transform(obj.export_all_data())
yield obj.export_all_data()

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
"""Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time.
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ With the Ad Account ID and API access token, you should be ready to start pullin

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.2.18 | 2021-09-28 | [6499](https://github.com/airbytehq/airbyte/pull/6499) | Fix field values converting fail |
| 0.2.17 | 2021-09-14 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types |
| 0.2.16 | 2021-09-14 | [6060](https://github.com/airbytehq/airbyte/pull/6060) | Fix schema for `ads_insights` stream |
| 0.2.15 | 2021-09-14 | [5958](https://github.com/airbytehq/airbyte/pull/5958) | Fix url parsing and add report that exposes conversions |
Expand Down

0 comments on commit 501d07f

Please sign in to comment.