-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathcomponents.py
96 lines (84 loc) · 3.82 KB
/
components.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from dataclasses import dataclass
from http import HTTPStatus
from itertools import chain
from typing import Any, List, Mapping, Union
import dpath.util
import requests
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.types import Config, Record
from requests import HTTPError
@dataclass
class AuthenticatorFacebookPageAccessToken(NoAuth):
config: Config
page_id: Union[InterpolatedString, str]
access_token: Union[InterpolatedString, str]
def __post_init__(self, options: Mapping[str, Any]):
self._page_id = InterpolatedString.create(self.page_id, options=options).eval(self.config)
self._access_token = InterpolatedString.create(self.access_token, options=options).eval(self.config)
def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest:
"""Attach the page access token to params to authenticate on the HTTP request"""
page_access_token = self.generate_page_access_token()
request.prepare_url(url=request.url, params={"access_token": page_access_token})
return request
# @staticmethod
def generate_page_access_token(self) -> str:
# We are expecting to receive User access token from config. To access
# Pages API we need to generate Page access token. Page access tokens
# can be generated from another Page access token (with the same page ID)
# so if user manually set Page access token instead of User access
# token it would be no problem unless it has wrong page ID.
# https://developers.facebook.com/docs/pages/access-tokens#get-a-page-access-token
try:
r = requests.get(f"https://graph.facebook.com/{self._page_id}", params={"fields": "access_token",
"access_token": self._access_token})
if r.status_code != HTTPStatus.OK:
raise HTTPError(r.text)
return r.json().get("access_token")
except Exception as e:
raise Exception(f"Error while generating page access token: {e}") from e
@dataclass
class NestedDpathExtractor(DpathExtractor):
"""
Record extractor that searches a decoded response over a path defined as an array of fields.
Extends the DpathExtractor to allow for a list of records to be generated from a dpath that points
to an array object as first point and iterates over list of records by the rest of path. See the example.
Example data:
```
{
"data": [
{'insights':
{'data': [
{"id": "id1",
"name": "name1",
...
},
{"id": "id1",
"name": "name1",
...
},
...
},
...
]
}
```
"""
def extract_records(self, response: requests.Response) -> List[Record]:
response_body = self.decoder.decode(response)
if len(self.field_pointer) == 0:
extracted = response_body
else:
pointer = [pointer.eval(self.config) for pointer in self.field_pointer]
extracted_list = dpath.util.get(response_body, pointer[0], default=[])
extracted = list(chain(*[dpath.util.get(x, pointer[1:], default=[]) for x in extracted_list])) if extracted_list else []
if isinstance(extracted, list):
return extracted
elif extracted:
return [extracted]
else:
return []