Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Cart: fixing of testing for a lot of data #5465

Merged
merged 12 commits into from
Aug 26, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "bb1a6d31-6879-4819-a2bd-3eed299ea8e2",
"name": "Cart.com",
"dockerRepository": "airbyte/source-cart",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/cart"
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@
- sourceDefinitionId: bb1a6d31-6879-4819-a2bd-3eed299ea8e2
name: Cart.com
dockerRepository: airbyte/source-cart
dockerImageTag: 0.1.1
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/cart
- sourceDefinitionId: d60a46d4-709f-4092-a6b7-2457f7d455f5
name: Prestashop
Expand Down
23 changes: 16 additions & 7 deletions airbyte-integrations/connectors/source-cart/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
FROM python:3.7-slim
FROM python:3.7.11-alpine3.14 as base
FROM base as builder

# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*

RUN apk --no-cache upgrade \
&& pip install --upgrade pip

WORKDIR /airbyte/integration_code
COPY source_cart ./source_cart
COPY main.py ./
COPY setup.py ./
RUN pip install .
RUN pip install --prefix=/install .


FROM base
COPY --from=builder /install /usr/local

WORKDIR /airbyte/integration_code
COPY main.py ./
COPY source_cart ./source_cart


ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-cart
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2)
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-cart/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk",
"airbyte-cdk~=0.1.7",
]

TEST_REQUIREMENTS = [
Expand Down
37 changes: 34 additions & 3 deletions airbyte-integrations/connectors/source-cart/source_cart/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
# SOFTWARE.
#

from functools import wraps
from typing import Any, List, Mapping, Tuple

import pendulum
import requests
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator
from pendulum.parsing.exceptions import ParserError

from .streams import CustomersCart, OrderItems, OrderPayments, Orders, Products

Expand All @@ -44,10 +46,33 @@ def get_auth_header(self) -> Mapping[str, Any]:


class SourceCart(AbstractSource):
def validate_config_values(func):
"""Check input config values for check_connection and stream functions. It will raise an exception if there is an parsing error"""

@wraps(func)
def decorator(self_, *args, **kwargs):
for arg in args:
if isinstance(arg, Mapping):
try:
# parse date strings by the pendulum library. It will raise the exception ParserError if it is some format mistakes.
pendulum.parse(arg["start_date"])
# try to check an end_date value. It can be ussed for different CI tests
end_date = arg.get("end_date")
if end_date:
pendulum.parse(end_date)
except ParserError as e:
raise Exception(f"{str(e)}. Example: 2021-01-01T00:00:00Z")
break

return func(self_, *args, **kwargs)

return decorator

@validate_config_values
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
try:
authenticator = CustomHeaderAuthenticator(access_token=config["access_token"])
pendulum.parse(config["start_date"])

stream = Products(authenticator=authenticator, start_date=config["start_date"], store_name=config["store_name"])
records = stream.read_records(sync_mode=SyncMode.full_refresh)
next(records)
Expand All @@ -60,7 +85,13 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) ->
return False, err_message
return False, repr(e)

@validate_config_values
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = CustomHeaderAuthenticator(access_token=config["access_token"])
args = {"authenticator": authenticator, "start_date": config["start_date"], "store_name": config["store_name"]}
args = {
"authenticator": authenticator,
"start_date": config["start_date"],
"store_name": config["store_name"],
"end_date": config.get("end_date"),
}
return [CustomersCart(**args), Orders(**args), OrderPayments(**args), OrderItems(**args), Products(**args)]
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@
"airbyte_secret": true,
"description": "API Key. See the <a href=\"https://docs.airbyte.io/integrations/sources/mailchimp\">docs</a> for information on how to generate this key."
},
"store_name": {
"type": "string",
"description": "Store name. All API URLs start with https://[mystorename.com]/api/v1/, where [mystorename.com] is the domain name of your store."
},
"start_date": {
"title": "Start Date",
"type": "string",
"description": "The date from which you'd like to replicate the data",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2021-01-01T00:00:00Z"]
},
"store_name": {
"type": "string",
"description": "Store name. All API URLs start with https://[mystorename.com]/api/v1/, where [mystorename.com] is the domain name of your store."
}
}
}
Expand Down
50 changes: 26 additions & 24 deletions airbyte-integrations/connectors/source-cart/source_cart/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#

import urllib.parse
from abc import ABC, abstractmethod
from abc import ABC
from typing import Any, Iterable, Mapping, MutableMapping, Optional

import requests
Expand All @@ -33,8 +33,9 @@
class CartStream(HttpStream, ABC):
primary_key = "id"

def __init__(self, start_date: str, store_name: str, **kwargs):
def __init__(self, start_date: str, store_name: str, end_date: str = None, **kwargs):
self._start_date = start_date
self._end_date = end_date
antixar marked this conversation as resolved.
Show resolved Hide resolved
self.store_name = store_name
super().__init__(**kwargs)

Expand All @@ -43,9 +44,15 @@ def url_base(self) -> str:
return f"https://{self.store_name}/api/v1/"

@property
@abstractmethod
def data_field() -> str:
"""Field of the response containing data"""
def data_field(self) -> str:
"""
Field of the response containing data.
By default the value self.name will be used if this property is empty or None
"""
return None

def path(self, **kwargs) -> str:
return self.name

def backoff_time(self, response: requests.Response) -> Optional[float]:
"""
Expand All @@ -57,6 +64,7 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
response_json = response.json()

if response_json.get("next_page"):
next_query_string = urllib.parse.urlsplit(response_json.get("next_page")).query
params = dict(urllib.parse.parse_qsl(next_query_string))
Expand All @@ -67,7 +75,7 @@ def request_headers(self, **kwargs) -> Mapping[str, Any]:

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
response_json = response.json()
result = response_json.get(self.data_field, [])
result = response_json.get(self.data_field or self.name, [])
yield from result

def request_params(
Expand All @@ -85,10 +93,20 @@ class IncrementalCartStream(CartStream, ABC):
cursor_field = "updated_at"

def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
"""
Generates a query for incremental logic

Docs: https://developers.cart.com/docs/rest-api/docs/query_syntax.md
"""
params = super().request_params(stream_state=stream_state, **kwargs)
cursor_value = stream_state.get(self.cursor_field) or self._start_date
params["sort"] = self.cursor_field
params[self.cursor_field] = f"gt:{max(cursor_value, self._start_date)}"
start_date = max(cursor_value, self._start_date)
query = f"gt:{start_date}"
if self._end_date and self._end_date > start_date:
query += f" AND lt:{self._end_date}"

params[self.cursor_field] = query
return params

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
Expand All @@ -111,19 +129,14 @@ class CustomersCart(IncrementalCartStream):
data_field = "customers"

def path(self, **kwargs) -> str:
return "customers"
return self.data_field


class Orders(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1orders/get
"""

data_field = "orders"

def path(self, **kwargs) -> str:
return "orders"


class OrderPayments(IncrementalCartStream):
"""
Expand All @@ -132,9 +145,6 @@ class OrderPayments(IncrementalCartStream):

data_field = "payments"

def path(self, **kwargs) -> str:
return "order_payments"


class OrderItems(IncrementalCartStream):
"""
Expand All @@ -143,16 +153,8 @@ class OrderItems(IncrementalCartStream):

data_field = "items"

def path(self, **kwargs) -> str:
return "order_items"


class Products(IncrementalCartStream):
"""
Docs: https://developers.cart.com/docs/rest-api/restapi.json/paths/~1products/get
"""

data_field = "products"

def path(self, **kwargs) -> str:
return "products"
1 change: 1 addition & 0 deletions docs/integrations/sources/cart.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ Please follow these [steps](https://developers.cart.com/docs/rest-api/docs/READM

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.3 | 2021-08-26 | [5465](https://github.com/airbytehq/airbyte/pull/5465) | Add the end_date option for limitation of the amount of synced data|
| 0.1.2 | 2021-08-23 | [1111](https://github.com/airbytehq/airbyte/pull/1111) | Add `order_items` stream |
| 0.1.0 | 2021-06-08 | [4574](https://github.com/airbytehq/airbyte/pull/4574) | Initial Release |