Skip to content

Commit

Permalink
🎉 Source Paypal Transation: RESULTSET_TOO_LARGE validation (#14804)
Browse files Browse the repository at this point in the history
* Updated API version from v9 to v11

* Updated PR number

* Updated after review

* Added validation error

* Fixed to linter

* Updated PR number

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
lazebnyi and octavia-squidington-iii authored Jul 19, 2022
1 parent 352dd51 commit 037d634
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@
- name: Paypal Transaction
sourceDefinitionId: d913b0f2-cc51-4e55-a44c-8ba1697b9239
dockerRepository: airbyte/source-paypal-transaction
dockerImageTag: 0.1.6
dockerImageTag: 0.1.7
documentationUrl: https://docs.airbyte.io/integrations/sources/paypal-transaction
icon: paypal.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6714,7 +6714,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-paypal-transaction:0.1.6"
- dockerImage: "airbyte/source-paypal-transaction:0.1.7"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/paypal-transactions"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.name=airbyte/source-paypal-transaction
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TEST_REQUIREMENTS = [
"pytest~=6.1",
"pytest-mock~=3.6",
"requests-mock",
"source-acceptance-test",
]

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union

import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, Oauth2Authenticator
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from dateutil.parser import isoparse

from .utils import middle_date_slices


class PaypalHttpException(Exception):
"""HTTPError Exception with detailed info"""
Expand All @@ -28,16 +31,20 @@ def __str__(self):
message = repr(self.error)

if self.error.response.content:
content = self.error.response.content.decode()
try:
details = json.loads(content)
except json.decoder.JSONDecodeError:
details = content

details = self.error_message()
message = f"{message} Details: {details}"

return message

def error_message(self):
content = self.error.response.content.decode()
try:
details = json.loads(content)
except json.decoder.JSONDecodeError:
details = content

return details

def __repr__(self):
return self.__str__()

Expand Down Expand Up @@ -72,7 +79,7 @@ class PaypalTransactionStream(HttpStream, ABC):
# API limit: (now() - start_date_min) <= start_date <= end_date <= last_refreshed_datetime <= now
start_date_min: Mapping[str, int] = {"days": 3 * 365} # API limit - 3 years
last_refreshed_datetime: Optional[datetime] = None # extracted from API response. Indicate the most resent possible start_date
stream_slice_period: Mapping[str, int] = {"days": 1} # max period is 31 days (API limit)
stream_slice_period: Mapping[str, int] = {"days": 15} # max period is 31 days (API limit)

requests_per_minute: int = 30 # API limit is 50 reqs/min from 1 IP to all endpoints, otherwise IP is banned for 5 mins

Expand Down Expand Up @@ -179,6 +186,11 @@ def get_field(record: Mapping[str, Any], field_path: Union[List[str], str]):

return data

@staticmethod
def max_records_in_response_reached(exception: Exception, **kwargs):
message = exception.error_message()
return message.get("name") == "RESULTSET_TOO_LARGE"

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]:
# This method is called once for each record returned from the API to compare the cursor field value in that record with the current state
# we then return an updated state object. If this is the first time we run a sync or no state was passed, current_stream_state will be None.
Expand Down Expand Up @@ -253,6 +265,59 @@ def stream_slices(

return slices

def _prepared_request(
self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Optional[dict] = None
):
request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
request = self._create_prepared_request(
path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token),
)
request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)

return request, request_kwargs

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
pagination_complete = False
next_page_token = None
while not pagination_complete:
request, request_kwargs = self._prepared_request(
stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token
)

try:
response = self._send_request(request, request_kwargs)
except PaypalHttpException as exception:
if self.max_records_in_response_reached(exception):
date_slices = middle_date_slices(stream_slice)
if date_slices:
for date_slice in date_slices:
yield from self.read_records(
sync_mode, cursor_field=cursor_field, stream_slice=date_slice, stream_state=stream_state
)
break
else:
raise exception

yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice)

next_page_token = self.next_page_token(response)
if not next_page_token:
pagination_complete = True

# Always return an empty generator just in case no records were ever yielded
yield from []

def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response:
try:
return super()._send_request(request, request_kwargs)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from datetime import datetime


def to_datetime_str(date: datetime) -> datetime:
"""
Returns the formated datetime string.
:: Output example: '2021-07-15T0:0:0+00:00' FORMAT : "%Y-%m-%dT%H:%M:%S%z"
"""
return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z")


def middle_date_slices(stream_slice):
"""Returns the mid-split datetime slices."""
start_date, end_date = to_datetime_str(stream_slice["start_date"]), to_datetime_str(stream_slice["end_date"])
if start_date < end_date:
middle_date = start_date + (end_date - start_date) / 2
return [
{
"start_date": start_date.isoformat(),
"end_date": middle_date.isoformat(),
},
{
"start_date": middle_date.isoformat(),
"end_date": end_date.isoformat(),
},
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
{
"transaction_details": [
{
"transaction_info": {
"paypal_account_id": "6STWC2LSUYYYE",
"transaction_id": "5TY05013RG002845M",
"transaction_event_code": "T0006",
"transaction_initiation_date": "2014-07-11T04:03:52+0000",
"transaction_updated_date": "2014-07-11T04:03:52+0000",
"transaction_amount": {
"currency_code": "USD",
"value": "465.00"
},
"fee_amount": {
"currency_code": "USD",
"value": "-13.79"
},
"insurance_amount": {
"currency_code": "USD",
"value": "15.00"
},
"shipping_amount": {
"currency_code": "USD",
"value": "30.00"
},
"shipping_discount_amount": {
"currency_code": "USD",
"value": "10.00"
},
"transaction_status": "S",
"transaction_subject": "Bill for your purchase",
"transaction_note": "Check out the latest sales",
"invoice_id": "Invoice-005",
"custom_field": "Thank you for your business",
"protection_eligibility": "01"
},
"payer_info": {
"account_id": "6STWC2LSUYYYE",
"email_address": "consumer@example.com",
"address_status": "Y",
"payer_status": "Y",
"payer_name": {
"given_name": "test",
"surname": "consumer",
"alternate_full_name": "test consumer"
},
"country_code": "US"
},
"shipping_info": {
"name": "Sowmith",
"address": {
"line1": "Eco Space, bellandur",
"line2": "OuterRingRoad",
"city": "Bangalore",
"country_code": "IN",
"postal_code": "560103"
}
},
"cart_info": {
"item_details": [
{
"item_code": "ItemCode-1",
"item_name": "Item1 - radio",
"item_description": "Radio",
"item_quantity": "2",
"item_unit_price": {
"currency_code": "USD",
"value": "50.00"
},
"item_amount": {
"currency_code": "USD",
"value": "100.00"
},
"tax_amounts": [
{
"tax_amount": {
"currency_code": "USD",
"value": "20.00"
}
}
],
"total_item_amount": {
"currency_code": "USD",
"value": "120.00"
},
"invoice_number": "Invoice-005"
},
{
"item_code": "ItemCode-2",
"item_name": "Item2 - Headset",
"item_description": "Headset",
"item_quantity": "3",
"item_unit_price": {
"currency_code": "USD",
"value": "100.00"
},
"item_amount": {
"currency_code": "USD",
"value": "300.00"
},
"tax_amounts": [
{
"tax_amount": {
"currency_code": "USD",
"value": "60.00"
}
}
],
"total_item_amount": {
"currency_code": "USD",
"value": "360.00"
},
"invoice_number": "Invoice-005"
},
{
"item_name": "3",
"item_quantity": "1",
"item_unit_price": {
"currency_code": "USD",
"value": "-50.00"
},
"item_amount": {
"currency_code": "USD",
"value": "-50.00"
},
"total_item_amount": {
"currency_code": "USD",
"value": "-50.00"
},
"invoice_number": "Invoice-005"
}
]
},
"store_info": {},
"auction_info": {},
"incentive_info": {}
}
],
"account_number": "XZXSPECPDZHZU",
"last_refreshed_datetime": "2017-01-02T06:59:59+0000",
"page": 1,
"total_items": 1,
"total_pages": 1,
"links": [
{
"href": "https://api-m.sandbox.paypal.com/v1/reporting/transactions?transaction_id=5TY05013RG002845M&fields=all&page_size=100&page=1",
"rel": "self",
"method": "GET"
}
]
}
Loading

0 comments on commit 037d634

Please sign in to comment.