-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
source.py
227 lines (209 loc) · 8.37 KB
/
source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import logging
from typing import Any, List, Mapping, Tuple
from airbyte_cdk.models import FailureType, SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.utils import AirbyteTracedException
from requests.exceptions import ConnectionError, RequestException, SSLError
from .auth import MissingAccessTokenError, ShopifyAuthenticator
from .scopes import ShopifyScopes
from .streams.streams import (
AbandonedCheckouts,
Articles,
BalanceTransactions,
Blogs,
Collections,
Collects,
Countries,
CustomCollections,
CustomerAddress,
CustomerJourneySummary,
Customers,
CustomerSavedSearch,
DiscountCodes,
Disputes,
DraftOrders,
FulfillmentOrders,
Fulfillments,
InventoryItems,
InventoryLevels,
Locations,
MetafieldArticles,
MetafieldBlogs,
MetafieldCollections,
MetafieldCustomers,
MetafieldDraftOrders,
MetafieldLocations,
MetafieldOrders,
MetafieldPages,
MetafieldProductImages,
MetafieldProducts,
MetafieldProductVariants,
MetafieldShops,
MetafieldSmartCollections,
OrderAgreements,
OrderRefunds,
OrderRisks,
Orders,
Pages,
PriceRules,
ProductImages,
Products,
ProductsGraphQl,
ProductVariants,
Shop,
SmartCollections,
TenderTransactions,
Transactions,
TransactionsGraphql,
)
class ConnectionCheckTest:
def __init__(self, config: Mapping[str, Any]) -> None:
self.config = config
# use `Shop` as a test stream for connection check
self.test_stream = Shop(self.config)
# setting `max_retries` to 0 for the stage of `check connection`,
# because it keeps retrying for wrong shop names,
# but it should stop immediately
self.test_stream._http_client.max_retries = 0
def describe_error(self, pattern: str, shop_name: str = None, details: Any = None, **kwargs) -> str:
connection_check_errors_map: Mapping[str, Any] = {
"connection_error": f"Connection could not be established using `Shopify Store`: {shop_name}. Make sure it's valid and try again.",
"request_exception": f"Request was not successfull, check your `input configuation` and try again. Details: {details}",
"index_error": f"Failed to access the Shopify store `{shop_name}`. Verify the entered Shopify store or API Key in `input configuration`.",
"missing_token_error": "Authentication was unsuccessful. Please verify your authentication credentials or login is correct.",
# add the other patterns and description, if needed...
}
return connection_check_errors_map.get(pattern)
def test_connection(self) -> tuple[bool, str]:
shop_name = self.config.get("shop")
if not shop_name:
return False, "The `Shopify Store` name is missing. Make sure it's entered and valid."
try:
response = list(self.test_stream.read_records(sync_mode=SyncMode.full_refresh))
# check for the shop_id is present in the response
shop_id = response[0].get("id")
if shop_id is not None:
return True, None
else:
return False, f"The `shop_id` is invalid: {shop_id}"
except (SSLError, ConnectionError):
return False, self.describe_error("connection_error", shop_name)
except RequestException as req_error:
return False, self.describe_error("request_exception", details=req_error)
except IndexError:
return False, self.describe_error("index_error", shop_name, response)
except MissingAccessTokenError:
return False, self.describe_error("missing_token_error")
def get_shop_id(self) -> str:
"""
We need to have the `shop_id` value available to have it passed elsewhere and fill-in the missing data.
By the time this method is tiggered, we are sure we've passed the `Connection Checks` and have the `shop_id` value.
"""
response = list(self.test_stream.read_records(sync_mode=SyncMode.full_refresh))
if len(response) == 0:
raise AirbyteTracedException(
message=f"Could not find a Shopify shop with the name {self.config.get('shop', '')}. Make sure it's valid.",
failure_type=FailureType.config_error,
)
shop_id = response[0].get("id")
if shop_id:
return shop_id
else:
raise Exception(f"Couldn't get `shop_id`. Actual `response`: {response}.")
class SourceShopify(AbstractSource):
@property
def continue_sync_on_stream_failure(self) -> bool:
return True
@staticmethod
def get_shop_name(config) -> str:
split_pattern = ".myshopify.com"
shop_name = config.get("shop")
return shop_name.split(split_pattern)[0] if split_pattern in shop_name else shop_name
@staticmethod
def format_stream_name(name) -> str:
return "".join(x.capitalize() for x in name.split("_"))
def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
Testing connection availability for the connector.
"""
config["shop"] = self.get_shop_name(config)
config["authenticator"] = ShopifyAuthenticator(config)
return ConnectionCheckTest(config).test_connection()
def select_transactions_stream(self, config: Mapping[str, Any]) -> Stream:
"""
Allow the Customer to decide which API type to use when it comes to the `Transactions` stream.
"""
should_fetch_user_id = config.get("fetch_transactions_user_id")
if should_fetch_user_id:
return Transactions(config)
else:
return TransactionsGraphql(config)
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
"""
Mapping a input config of the user input configuration as defined in the connector spec.
Defining streams to run.
"""
config["shop"] = self.get_shop_name(config)
config["authenticator"] = ShopifyAuthenticator(config)
# add `shop_id` int value
config["shop_id"] = ConnectionCheckTest(config).get_shop_id()
# define scopes checker
scopes_manager: ShopifyScopes = ShopifyScopes(config)
# get the list of the permitted streams, based on the authenticated user scopes
permitted_streams = scopes_manager.get_permitted_streams()
stream_instances = [
AbandonedCheckouts(config),
Articles(config),
BalanceTransactions(config),
Blogs(config),
Collections(config),
Collects(config),
CustomCollections(config),
CustomerJourneySummary(config),
Customers(config),
DiscountCodes(config),
Disputes(config),
DraftOrders(config),
FulfillmentOrders(config),
Fulfillments(config),
InventoryItems(config),
InventoryLevels(config),
Locations(config),
MetafieldArticles(config),
MetafieldBlogs(config),
MetafieldCollections(config),
MetafieldCustomers(config),
MetafieldDraftOrders(config),
MetafieldLocations(config),
MetafieldOrders(config),
MetafieldPages(config),
MetafieldProductImages(config),
MetafieldProducts(config),
MetafieldProductVariants(config),
MetafieldShops(config),
MetafieldSmartCollections(config),
OrderAgreements(config),
OrderRefunds(config),
OrderRisks(config),
Orders(config),
Pages(config),
PriceRules(config),
ProductImages(config),
Products(config),
ProductsGraphQl(config),
ProductVariants(config),
Shop(config),
SmartCollections(config),
TenderTransactions(config),
self.select_transactions_stream(config),
CustomerSavedSearch(config),
CustomerAddress(config),
Countries(config),
]
return [
stream_instance for stream_instance in stream_instances if self.format_stream_name(stream_instance.name) in permitted_streams
]