diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 5462461d8ab2..b1e76eec2589 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -683,7 +683,7 @@ - name: Paypal Transaction sourceDefinitionId: d913b0f2-cc51-4e55-a44c-8ba1697b9239 dockerRepository: airbyte/source-paypal-transaction - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://docs.airbyte.io/integrations/sources/paypal-transaction icon: paypal.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index b4db3d4ec3c2..2c23290c01ba 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6714,7 +6714,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-paypal-transaction:0.1.6" +- dockerImage: "airbyte/source-paypal-transaction:0.1.7" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/paypal-transactions" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-paypal-transaction/Dockerfile b/airbyte-integrations/connectors/source-paypal-transaction/Dockerfile index 9b4e721e9204..85a3cff58163 100644 --- a/airbyte-integrations/connectors/source-paypal-transaction/Dockerfile +++ b/airbyte-integrations/connectors/source-paypal-transaction/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-paypal-transaction diff --git a/airbyte-integrations/connectors/source-paypal-transaction/setup.py b/airbyte-integrations/connectors/source-paypal-transaction/setup.py index a6daf3f0f24f..8babc8c122b5 100644 --- a/airbyte-integrations/connectors/source-paypal-transaction/setup.py +++ b/airbyte-integrations/connectors/source-paypal-transaction/setup.py @@ -12,6 +12,7 @@ TEST_REQUIREMENTS = [ "pytest~=6.1", "pytest-mock~=3.6", + "requests-mock", "source-acceptance-test", ] diff --git a/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/schemas/TODO.md b/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/schemas/TODO.md deleted file mode 100644 index cf1efadb3c9c..000000000000 --- a/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/schemas/TODO.md +++ /dev/null @@ -1,25 +0,0 @@ -# TODO: Define your stream schemas -Your connector must describe the schema of each stream it can output using [JSONSchema](https://json-schema.org). - -The simplest way to do this is to describe the schema of your streams using one `.json` file per stream. You can also dynamically generate the schema of your stream in code, or you can combine both approaches: start with a `.json` file and dynamically add properties to it. - -The schema of a stream is the return value of `Stream.get_json_schema`. - -## Static schemas -By default, `Stream.get_json_schema` reads a `.json` file in the `schemas/` directory whose name is equal to the value of the `Stream.name` property. In turn `Stream.name` by default returns the name of the class in snake case. Therefore, if you have a class `class EmployeeBenefits(HttpStream)` the default behavior will look for a file called `schemas/employee_benefits.json`. You can override any of these behaviors as you need. - -Important note: any objects referenced via `$ref` should be placed in the `shared/` directory in their own `.json` files. - -## Dynamic schemas -If you'd rather define your schema in code, override `Stream.get_json_schema` in your stream class to return a `dict` describing the schema using [JSONSchema](https://json-schema.org). - -## Dynamically modifying static schemas -Override `Stream.get_json_schema` to run the default behavior, edit the returned value, then return the edited value: -``` -def get_json_schema(self): - schema = super().get_json_schema() - schema['dynamically_determined_property'] = "property" - return schema -``` - -Delete this file once you're done. Or don't. Up to you :) diff --git a/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/source.py b/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/source.py index 74c4466831df..bfd5724e1067 100644 --- a/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/source.py +++ b/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/source.py @@ -10,6 +10,7 @@ from typing import Any, Callable, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union import requests +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream @@ -17,6 +18,8 @@ from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from dateutil.parser import isoparse +from .utils import middle_date_slices + class PaypalHttpException(Exception): """HTTPError Exception with detailed info""" @@ -28,16 +31,20 @@ def __str__(self): message = repr(self.error) if self.error.response.content: - content = self.error.response.content.decode() - try: - details = json.loads(content) - except json.decoder.JSONDecodeError: - details = content - + details = self.error_message() message = f"{message} Details: {details}" return message + def error_message(self): + content = self.error.response.content.decode() + try: + details = json.loads(content) + except json.decoder.JSONDecodeError: + details = content + + return details + def __repr__(self): return self.__str__() @@ -72,7 +79,7 @@ class PaypalTransactionStream(HttpStream, ABC): # API limit: (now() - start_date_min) <= start_date <= end_date <= last_refreshed_datetime <= now start_date_min: Mapping[str, int] = {"days": 3 * 365} # API limit - 3 years last_refreshed_datetime: Optional[datetime] = None # extracted from API response. Indicate the most resent possible start_date - stream_slice_period: Mapping[str, int] = {"days": 1} # max period is 31 days (API limit) + stream_slice_period: Mapping[str, int] = {"days": 15} # max period is 31 days (API limit) requests_per_minute: int = 30 # API limit is 50 reqs/min from 1 IP to all endpoints, otherwise IP is banned for 5 mins @@ -179,6 +186,11 @@ def get_field(record: Mapping[str, Any], field_path: Union[List[str], str]): return data + @staticmethod + def max_records_in_response_reached(exception: Exception, **kwargs): + message = exception.error_message() + return message.get("name") == "RESULTSET_TOO_LARGE" + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, any]: # This method is called once for each record returned from the API to compare the cursor field value in that record with the current state # we then return an updated state object. If this is the first time we run a sync or no state was passed, current_stream_state will be None. @@ -253,6 +265,59 @@ def stream_slices( return slices + def _prepared_request( + self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, next_page_token: Optional[dict] = None + ): + request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + request = self._create_prepared_request( + path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + headers=dict(request_headers, **self.authenticator.get_auth_header()), + params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + ) + request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + + return request, request_kwargs + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + stream_state = stream_state or {} + pagination_complete = False + next_page_token = None + while not pagination_complete: + request, request_kwargs = self._prepared_request( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) + + try: + response = self._send_request(request, request_kwargs) + except PaypalHttpException as exception: + if self.max_records_in_response_reached(exception): + date_slices = middle_date_slices(stream_slice) + if date_slices: + for date_slice in date_slices: + yield from self.read_records( + sync_mode, cursor_field=cursor_field, stream_slice=date_slice, stream_state=stream_state + ) + break + else: + raise exception + + yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) + + next_page_token = self.next_page_token(response) + if not next_page_token: + pagination_complete = True + + # Always return an empty generator just in case no records were ever yielded + yield from [] + def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> requests.Response: try: return super()._send_request(request, request_kwargs) diff --git a/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/utils.py b/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/utils.py new file mode 100644 index 000000000000..ad6c0232dce4 --- /dev/null +++ b/airbyte-integrations/connectors/source-paypal-transaction/source_paypal_transaction/utils.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from datetime import datetime + + +def to_datetime_str(date: datetime) -> datetime: + """ + Returns the formated datetime string. + :: Output example: '2021-07-15T0:0:0+00:00' FORMAT : "%Y-%m-%dT%H:%M:%S%z" + """ + return datetime.strptime(date, "%Y-%m-%dT%H:%M:%S%z") + + +def middle_date_slices(stream_slice): + """Returns the mid-split datetime slices.""" + start_date, end_date = to_datetime_str(stream_slice["start_date"]), to_datetime_str(stream_slice["end_date"]) + if start_date < end_date: + middle_date = start_date + (end_date - start_date) / 2 + return [ + { + "start_date": start_date.isoformat(), + "end_date": middle_date.isoformat(), + }, + { + "start_date": middle_date.isoformat(), + "end_date": end_date.isoformat(), + }, + ] diff --git a/airbyte-integrations/connectors/source-paypal-transaction/unit_tests/transaction.json b/airbyte-integrations/connectors/source-paypal-transaction/unit_tests/transaction.json new file mode 100644 index 000000000000..00621a91e6d1 --- /dev/null +++ b/airbyte-integrations/connectors/source-paypal-transaction/unit_tests/transaction.json @@ -0,0 +1,151 @@ +{ + "transaction_details": [ + { + "transaction_info": { + "paypal_account_id": "6STWC2LSUYYYE", + "transaction_id": "5TY05013RG002845M", + "transaction_event_code": "T0006", + "transaction_initiation_date": "2014-07-11T04:03:52+0000", + "transaction_updated_date": "2014-07-11T04:03:52+0000", + "transaction_amount": { + "currency_code": "USD", + "value": "465.00" + }, + "fee_amount": { + "currency_code": "USD", + "value": "-13.79" + }, + "insurance_amount": { + "currency_code": "USD", + "value": "15.00" + }, + "shipping_amount": { + "currency_code": "USD", + "value": "30.00" + }, + "shipping_discount_amount": { + "currency_code": "USD", + "value": "10.00" + }, + "transaction_status": "S", + "transaction_subject": "Bill for your purchase", + "transaction_note": "Check out the latest sales", + "invoice_id": "Invoice-005", + "custom_field": "Thank you for your business", + "protection_eligibility": "01" + }, + "payer_info": { + "account_id": "6STWC2LSUYYYE", + "email_address": "consumer@example.com", + "address_status": "Y", + "payer_status": "Y", + "payer_name": { + "given_name": "test", + "surname": "consumer", + "alternate_full_name": "test consumer" + }, + "country_code": "US" + }, + "shipping_info": { + "name": "Sowmith", + "address": { + "line1": "Eco Space, bellandur", + "line2": "OuterRingRoad", + "city": "Bangalore", + "country_code": "IN", + "postal_code": "560103" + } + }, + "cart_info": { + "item_details": [ + { + "item_code": "ItemCode-1", + "item_name": "Item1 - radio", + "item_description": "Radio", + "item_quantity": "2", + "item_unit_price": { + "currency_code": "USD", + "value": "50.00" + }, + "item_amount": { + "currency_code": "USD", + "value": "100.00" + }, + "tax_amounts": [ + { + "tax_amount": { + "currency_code": "USD", + "value": "20.00" + } + } + ], + "total_item_amount": { + "currency_code": "USD", + "value": "120.00" + }, + "invoice_number": "Invoice-005" + }, + { + "item_code": "ItemCode-2", + "item_name": "Item2 - Headset", + "item_description": "Headset", + "item_quantity": "3", + "item_unit_price": { + "currency_code": "USD", + "value": "100.00" + }, + "item_amount": { + "currency_code": "USD", + "value": "300.00" + }, + "tax_amounts": [ + { + "tax_amount": { + "currency_code": "USD", + "value": "60.00" + } + } + ], + "total_item_amount": { + "currency_code": "USD", + "value": "360.00" + }, + "invoice_number": "Invoice-005" + }, + { + "item_name": "3", + "item_quantity": "1", + "item_unit_price": { + "currency_code": "USD", + "value": "-50.00" + }, + "item_amount": { + "currency_code": "USD", + "value": "-50.00" + }, + "total_item_amount": { + "currency_code": "USD", + "value": "-50.00" + }, + "invoice_number": "Invoice-005" + } + ] + }, + "store_info": {}, + "auction_info": {}, + "incentive_info": {} + } + ], + "account_number": "XZXSPECPDZHZU", + "last_refreshed_datetime": "2017-01-02T06:59:59+0000", + "page": 1, + "total_items": 1, + "total_pages": 1, + "links": [ + { + "href": "https://api-m.sandbox.paypal.com/v1/reporting/transactions?transaction_id=5TY05013RG002845M&fields=all&page_size=100&page=1", + "rel": "self", + "method": "GET" + } + ] +} diff --git a/airbyte-integrations/connectors/source-paypal-transaction/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-paypal-transaction/unit_tests/unit_test.py index 0b2b455389bf..a784b1122393 100644 --- a/airbyte-integrations/connectors/source-paypal-transaction/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-paypal-transaction/unit_tests/unit_test.py @@ -2,11 +2,13 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import json +import pathlib from datetime import datetime, timedelta from airbyte_cdk.sources.streams.http.auth import NoAuth from dateutil.parser import isoparse -from pytest import fixture +from pytest import fixture, raises from source_paypal_transaction.source import Balances, PaypalTransactionStream, Transactions @@ -16,8 +18,15 @@ def time_sleep_mock(mocker): yield time_mock -def test_get_field(): +@fixture(autouse=True) +def transactions(request): + file = pathlib.Path(request.node.fspath.strpath) + transaction = file.with_name("transaction.json") + with transaction.open() as fp: + return json.load(fp) + +def test_get_field(): record = {"a": {"b": {"c": "d"}}} # Test expected result - field_path is a list assert "d" == PaypalTransactionStream.get_field(record, field_path=["a", "b", "c"]) @@ -67,7 +76,6 @@ def now(): def test_transactions_stream_slices(): - start_date_max = {"hours": 0} # if start_date > now - **start_date_max then no slices @@ -109,6 +117,7 @@ def test_transactions_stream_slices(): start_date=now() - timedelta(**start_date_max) - timedelta(days=1), ) transactions.get_last_refreshed_datetime = lambda x: None + transactions.stream_slice_period = {"days": 1} stream_slices = transactions.stream_slices(sync_mode="any") assert 2 == len(stream_slices) @@ -117,6 +126,7 @@ def test_transactions_stream_slices(): start_date=now() - timedelta(**start_date_max) - timedelta(days=1, hours=2), ) transactions.get_last_refreshed_datetime = lambda x: None + transactions.stream_slice_period = {"days": 1} stream_slices = transactions.stream_slices(sync_mode="any") assert 2 == len(stream_slices) @@ -125,6 +135,7 @@ def test_transactions_stream_slices(): start_date=now() - timedelta(**start_date_max) - timedelta(days=30, minutes=1), ) transactions.get_last_refreshed_datetime = lambda x: None + transactions.stream_slice_period = {"days": 1} stream_slices = transactions.stream_slices(sync_mode="any") assert 31 == len(stream_slices) @@ -135,6 +146,7 @@ def test_transactions_stream_slices(): end_date=isoparse("2021-06-04T12:00:00+00:00"), ) transactions.get_last_refreshed_datetime = lambda x: None + transactions.stream_slice_period = {"days": 1} stream_slices = transactions.stream_slices(sync_mode="any") assert [ {"start_date": "2021-06-01T10:00:00+00:00", "end_date": "2021-06-02T10:00:00+00:00"}, @@ -150,6 +162,7 @@ def test_transactions_stream_slices(): end_date=isoparse("2021-06-04T12:00:00+00:00"), ) transactions.get_last_refreshed_datetime = lambda x: None + transactions.stream_slice_period = {"days": 1} stream_slices = transactions.stream_slices(sync_mode="any", stream_state={"date": "2021-06-02T10:00:00+00:00"}) assert [ {"start_date": "2021-06-02T10:00:00+00:00", "end_date": "2021-06-03T10:00:00+00:00"}, @@ -197,6 +210,7 @@ def test_balances_stream_slices(): start_date=now - timedelta(days=1), ) balance.get_last_refreshed_datetime = lambda x: None + balance.stream_slice_period = {"days": 1} stream_slices = balance.stream_slices(sync_mode="any") assert 2 == len(stream_slices) @@ -205,6 +219,7 @@ def test_balances_stream_slices(): start_date=now - timedelta(days=1, minutes=1), ) balance.get_last_refreshed_datetime = lambda x: None + balance.stream_slice_period = {"days": 1} stream_slices = balance.stream_slices(sync_mode="any") assert 2 == len(stream_slices) @@ -215,6 +230,7 @@ def test_balances_stream_slices(): end_date=isoparse("2021-06-03T12:00:00+00:00"), ) balance.get_last_refreshed_datetime = lambda x: None + balance.stream_slice_period = {"days": 1} stream_slices = balance.stream_slices(sync_mode="any") assert [ {"start_date": "2021-06-01T10:00:00+00:00", "end_date": "2021-06-02T10:00:00+00:00"}, @@ -229,6 +245,7 @@ def test_balances_stream_slices(): end_date=isoparse("2021-06-03T12:00:00+00:00"), ) balance.get_last_refreshed_datetime = lambda x: None + balance.stream_slice_period = {"days": 1} stream_slices = balance.stream_slices(sync_mode="any", stream_state={"date": "2021-06-02T10:00:00+00:00"}) assert [ {"start_date": "2021-06-02T10:00:00+00:00", "end_date": "2021-06-03T10:00:00+00:00"}, @@ -241,6 +258,7 @@ def test_balances_stream_slices(): end_date=isoparse("2021-06-03T12:00:00+00:00"), ) balance.get_last_refreshed_datetime = lambda x: None + balance.stream_slice_period = {"days": 1} stream_slices = balance.stream_slices(sync_mode="any", stream_state={"date": "2021-06-03T11:00:00+00:00"}) assert [{"start_date": "2021-06-03T11:00:00+00:00", "end_date": "2021-06-03T12:00:00+00:00"}] == stream_slices @@ -250,5 +268,44 @@ def test_balances_stream_slices(): end_date=isoparse("2021-06-03T12:00:00+00:00"), ) balance.get_last_refreshed_datetime = lambda x: None + balance.stream_slice_period = {"days": 1} stream_slices = balance.stream_slices(sync_mode="any", stream_state={"date": "2021-06-03T12:00:00+00:00"}) assert [{"start_date": "2021-06-03T12:00:00+00:00", "end_date": "2021-06-03T12:00:00+00:00"}] == stream_slices + + +def test_max_records_in_response_reached(transactions, requests_mock): + balance = Transactions( + authenticator=NoAuth(), + start_date=isoparse("2021-07-01T10:00:00+00:00"), + end_date=isoparse("2021-07-29T12:00:00+00:00"), + ) + error_message = { + "name": "RESULTSET_TOO_LARGE", + "message": "Result set size is greater than the maximum limit. Change the filter " "criteria and try again.", + } + url = "https://api-m.paypal.com/v1/reporting/transactions" + + requests_mock.register_uri( + "GET", + url + "?start_date=2021-07-01T12%3A00%3A00%2B00%3A00&end_date=2021-07-29T12%3A00%3A00%2B00%3A00", + json=error_message, + status_code=400, + ) + requests_mock.register_uri( + "GET", url + "?start_date=2021-07-01T12%3A00%3A00%2B00%3A00&end_date=2021-07-15T12%3A00%3A00%2B00%3A00", json=transactions + ) + requests_mock.register_uri( + "GET", url + "?start_date=2021-07-15T12%3A00%3A00%2B00%3A00&end_date=2021-07-29T12%3A00%3A00%2B00%3A00", json=transactions + ) + month_date_slice = {"start_date": "2021-07-01T12:00:00+00:00", "end_date": "2021-07-29T12:00:00+00:00"} + assert len(list(balance.read_records(sync_mode="any", stream_slice=month_date_slice))) == 2 + + requests_mock.register_uri( + "GET", + url + "?start_date=2021-07-01T12%3A00%3A00%2B00%3A00&end_date=2021-07-01T12%3A00%3A00%2B00%3A00", + json=error_message, + status_code=400, + ) + one_day_slice = {"start_date": "2021-07-01T12:00:00+00:00", "end_date": "2021-07-01T12:00:00+00:00"} + with raises(Exception): + assert next(balance.read_records(sync_mode="any", stream_slice=one_day_slice)) diff --git a/docs/integrations/sources/paypal-transaction.md b/docs/integrations/sources/paypal-transaction.md index 2abb9e31a915..32c0b71fe6a8 100644 --- a/docs/integrations/sources/paypal-transaction.md +++ b/docs/integrations/sources/paypal-transaction.md @@ -57,11 +57,12 @@ Transactions sync is performed with default `stream_slice_period` = 1 day, it me | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------| +| 0.1.7 | 2022-07-18 | [14804](https://github.com/airbytehq/airbyte/pull/14804) | Adding `RESULTSET_TOO_LARGE` error validation | | 0.1.6 | 2022-06-10 | [13682](https://github.com/airbytehq/airbyte/pull/13682) | Update paypal transaction schema | | 0.1.5 | 2022-04-27 | [12335](https://github.com/airbytehq/airbyte/pull/12335) | Adding fixtures to mock time.sleep for connectors that explicitly sleep | | 0.1.4 | 2021-12-22 | [9034](https://github.com/airbytehq/airbyte/pull/9034) | Update connector fields title/description | | 0.1.3 | 2021-12-16 | [8580](https://github.com/airbytehq/airbyte/pull/8580) | Added more logs during `check connection` stage | | 0.1.2 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | -| 0.1.1 | 2021-08-03 | [5155](https://github.com/airbytehq/airbyte/pull/5155) | fix start\_date\_min limit | +| 0.1.1 | 2021-08-03 | [5155](https://github.com/airbytehq/airbyte/pull/5155) | Fix start\_date\_min limit | | 0.1.0 | 2021-06-10 | [4240](https://github.com/airbytehq/airbyte/pull/4240) | PayPal Transaction Search API |