Skip to content

Commit

Permalink
🎉 Source Amazon Seller Partner: Add new streams (#13604)
Browse files Browse the repository at this point in the history
* add new streams

* update connector version in Dockerfile and docs

* fix end_date if is None and save reportid in unique records

* WIP: add test for slices in settlement reports

* auto-bump connector version

* fix: formatting

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Harshith Mullapudi <harshithmullapudi@gmail.com>
  • Loading branch information
3 people authored Jul 15, 2022
1 parent 25ee6f6 commit 00c3ec7
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
- name: Amazon Seller Partner
sourceDefinitionId: e55879a8-0ef8-4557-abcf-ab34c53ec460
dockerRepository: airbyte/source-amazon-seller-partner
dockerImageTag: 0.2.22
dockerImageTag: 0.2.23
sourceType: api
documentationUrl: https://docs.airbyte.io/integrations/sources/amazon-seller-partner
icon: amazonsellerpartner.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@
type: "string"
path_in_connector_config:
- "client_secret"
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.22"
- dockerImage: "airbyte/source-amazon-seller-partner:0.2.23"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
changelogUrl: "https://docs.airbyte.io/integrations/sources/amazon-seller-partner"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.2.22
LABEL io.airbyte.version=0.2.23
LABEL io.airbyte.name=airbyte/source-amazon-seller-partner
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"title": "FBA customer returns",
"description": "FBA customer returns report Data Reports",
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"return-date": { "type": ["null", "string"] },
"order-id": { "type": ["null", "string"] },
"sku": { "type": ["null", "string"] },
"asin": { "type": ["null", "string"] },
"fnsku": { "type": ["null", "string"] },
"product-name": { "type": ["null", "string"] },
"quantity": { "type": ["null", "string"] },
"fulfillment-center-id": { "type": ["null", "string"] },
"detailed-disposition": { "type": ["null", "string"] },
"reason": { "type": ["null", "string"] },
"status": { "type": ["null", "string"] },
"license-plate-number": { "type": ["null", "string"] },
"customer-comments": { "type": ["null", "string"] }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"title": "Flat File Settlement Reports",
"description": "",
"type": "object",
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"settlement-id": { "type": ["null", "string"] },
"settlement-start-date": { "type": ["null", "string"] },
"settlement-end-date": { "type": ["null", "string"] },
"deposit-date": { "type": ["null", "string"] },
"total-amount": { "type": ["null", "string"] },
"currency": { "type": ["null", "string"] },
"transaction-type": { "type": ["null", "string"] },
"order-id": { "type": ["null", "string"] },
"merchant-order-id": { "type": ["null", "string"] },
"adjustment-id": { "type": ["null", "string"] },
"shipment-id": { "type": ["null", "string"] },
"marketplace-name": { "type": ["null", "string"] },
"shipment-fee-type": { "type": ["null", "string"] },
"shipment-fee-amount": { "type": ["null", "string"] },
"order-fee-type": { "type": ["null", "string"] },
"order-fee-amount": { "type": ["null", "string"] },
"fulfillment-id": { "type": ["null", "string"] },
"posted-date": { "type": ["null", "string"] },
"order-item-code": { "type": ["null", "string"] },
"merchant-order-item-id": { "type": ["null", "string"] },
"merchant-adjustment-item-id": { "type": ["null", "string"] },
"sku": { "type": ["null", "string"] },
"quantity-purchased": { "type": ["null", "string"] },
"price-type": { "type": ["null", "string"] },
"price-amount": { "type": ["null", "string"] },
"item-related-fee-type": { "type": ["null", "string"] },
"item-related-fee-amount": { "type": ["null", "string"] },
"misc-fee-amount": { "type": ["null", "string"] },
"other-fee-amount": { "type": ["null", "string"] },
"other-fee-reason-description": { "type": ["null", "string"] },
"promotion-id": { "type": ["null", "string"] },
"promotion-type": { "type": ["null", "string"] },
"promotion-amount": { "type": ["null", "string"] },
"direct-payment-type": { "type": ["null", "string"] },
"direct-payment-amount": { "type": ["null", "string"] },
"other-amount": { "type": ["null", "string"] },
"report_id": { "type": ["null", "string"] }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
BrandAnalyticsMarketBasketReports,
BrandAnalyticsRepeatPurchaseReports,
BrandAnalyticsSearchTermsReports,
FbaCustomerReturnsReports,
FbaInventoryReports,
FbaOrdersReports,
FbaReplacementsReports,
FbaShipmentsReports,
FlatFileOpenListingsReports,
FlatFileOrdersReports,
FlatFileOrdersReportsByLastUpdate,
FlatFileSettlementV2Reports,
FulfilledShipmentsReports,
GetXmlBrowseTreeData,
ListFinancialEventGroups,
Expand Down Expand Up @@ -117,13 +119,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
stream_kwargs = self._get_stream_kwargs(config)

return [
FbaCustomerReturnsReports(**stream_kwargs),
FbaInventoryReports(**stream_kwargs),
FbaOrdersReports(**stream_kwargs),
FbaShipmentsReports(**stream_kwargs),
FbaReplacementsReports(**stream_kwargs),
FlatFileOpenListingsReports(**stream_kwargs),
FlatFileOrdersReports(**stream_kwargs),
FlatFileOrdersReportsByLastUpdate(**stream_kwargs),
FlatFileSettlementV2Reports(**stream_kwargs),
FulfilledShipmentsReports(**stream_kwargs),
MerchantListingsReports(**stream_kwargs),
VendorDirectFulfillmentShipping(**stream_kwargs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,94 @@ def path(self, **kwargs) -> str:

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
yield from [response.json().get(self.data_field, {}).get("FinancialEvents", {})]


class FbaCustomerReturnsReports(ReportsAmazonSPStream):

name = "GET_FBA_FULFILLMENT_CUSTOMER_RETURNS_DATA"

def _report_data(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Mapping[str, Any]:
replication_start_date = pendulum.parse(self._replication_start_date)

data = {
"reportType": self.name,
"marketplaceIds": [self.marketplace_id],
"dataStartTime": replication_start_date.strftime(DATE_TIME_FORMAT),
}
return data


class FlatFileSettlementV2Reports(ReportsAmazonSPStream):

name = "GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE"

def _create_report(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Mapping[str, Any]:

# For backwards

return {"reportId": stream_slice.get("report_id")}

def stream_slices(
self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""
From https://developer-docs.amazon.com/sp-api/docs/report-type-values
documentation:
```Settlement reports cannot be requested or scheduled.
They are automatically scheduled by Amazon.
You can search for these reports using the getReports operation.
```
"""

strict_start_date = pendulum.now("utc").subtract(days=90)

create_date = max(pendulum.parse(self._replication_start_date), strict_start_date)
end_date = pendulum.parse(self._replication_end_date or pendulum.now("utc").date().to_date_string())

if end_date < strict_start_date:
end_date = pendulum.now("utc")

params = {
"reportTypes": self.name,
"pageSize": 100,
"createdSince": create_date.strftime(DATE_TIME_FORMAT),
"createdUntil": end_date.strftime(DATE_TIME_FORMAT),
}
unique_records = list()
complete = False

while not complete:

request_headers = self.request_headers()
get_reports = self._create_prepared_request(
http_method="GET",
path=f"{self.path_prefix}/reports",
headers=dict(request_headers, **self.authenticator.get_auth_header()),
params=params,
)
report_response = self._send_request(get_reports)
response = report_response.json()
data = response.get(self.data_field, list())

records = [e.get("reportId") for e in data if e and e.get("reportId") not in unique_records]
unique_records += records
reports = [{"report_id": report_id} for report_id in records]

yield from reports

next_value = response.get("nextToken", None)
params = {"nextToken": next_value}
if not next_value:
complete = True
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import pytest
import requests
from airbyte_cdk.models import SyncMode
from source_amazon_seller_partner.auth import AWSSignature
from source_amazon_seller_partner.streams import FlatFileSettlementV2Reports

START_DATE_1 = "2022-05-25T00:00:00Z"
END_DATE_1 = "2022-05-26T00:00:00Z"

generated_reports_from_amazon = {
"payload": [
{
"createdTime": "2022-07-08T10:39:31+00:00",
"dataEndTime": "2022-07-08T09:59:21+00:00",
"dataStartTime": "2022-06-27T08:01:32+00:00",
"marketplaceIds": [
"A1F83G8C2ARO7P",
"A1PA6795UKMFR9",
"A13V1IB3VIYZZH",
"AZMDEXL2RVFNN",
"A38D8NSA03LJTC",
"A1ZFFQZ3HTUKT9",
"APJ6JRA9NG5V4",
"A1RKKUPIHCS9HS",
"A62U237T8HV6N",
"AFQLKURYRPEL8",
"A1NYP31CE519TD",
"A1805IZSGTT6HS",
"A33AVAJ2PDY3EV",
"AMEN7PMS3EDWL",
"A2NODRKZP88ZB9",
"A1C3SOZRARQ6R3",
],
"processingEndTime": "2022-07-08T10:39:31+00:00",
"processingStartTime": "2022-07-08T10:39:31+00:00",
"processingStatus": "DONE",
"reportDocumentId": "amzn1.spdoc.1.3.0fcde1b1-a35e-4fe1-b077-38e0e9f65d63.T1SN9707N5X5IQ.0000",
"reportId": "85968019100",
"reportType": "GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE",
},
{
"createdTime": "2022-07-06T09:12:07+00:00",
"dataEndTime": "2022-07-06T08:38:16+00:00",
"dataStartTime": "2022-06-22T08:38:16+00:00",
"marketplaceIds": [
"A1F83G8C2ARO7P",
"A1PA6795UKMFR9",
"A13V1IB3VIYZZH",
"AZMDEXL2RVFNN",
"A38D8NSA03LJTC",
"A1ZFFQZ3HTUKT9",
"APJ6JRA9NG5V4",
"A1RKKUPIHCS9HS",
"A62U237T8HV6N",
"AFQLKURYRPEL8",
"A1NYP31CE519TD",
"A1805IZSGTT6HS",
"A33AVAJ2PDY3EV",
"AMEN7PMS3EDWL",
"A2NODRKZP88ZB9",
"A1C3SOZRARQ6R3",
],
"processingEndTime": "2022-07-06T09:12:07+00:00",
"processingStartTime": "2022-07-06T09:12:07+00:00",
"processingStatus": "DONE",
"reportDocumentId": "amzn1.spdoc.1.3.f7f43990-7c58-40f2-a93f-565b79a88269.T3OS2416I1AAXM.0000",
"reportId": "85948019111",
"reportType": "GET_V2_SETTLEMENT_REPORT_DATA_FLAT_FILE",
},
]
}


@pytest.fixture
def settlement_reports_stream():
def _internal(start_date: str = START_DATE_1, end_date: str = END_DATE_1):
aws_signature = AWSSignature(
service="execute-api",
aws_access_key_id="AccessKeyId",
aws_secret_access_key="SecretAccessKey",
aws_session_token="SessionToken",
region="US",
)
stream = FlatFileSettlementV2Reports(
url_base="https://test.url",
aws_signature=aws_signature,
replication_start_date=start_date,
replication_end_date=end_date,
marketplace_id="id",
authenticator=None,
period_in_days=0,
report_options=None,
max_wait_seconds=500,
)
return stream

return _internal


def test_stream_slices_method(mocker, settlement_reports_stream):
response = requests.Response()
mocker.patch.object(response, "json", return_value=generated_reports_from_amazon)

data = response.json().get("payload", list())

slices = [{"report_id": e.get("reportId")} for e in data]

for i in range(len(slices)):
report = settlement_reports_stream()._create_report(sync_mode=SyncMode.full_refresh, stream_slice=slices[i])
assert report.get("reportId") == generated_reports_from_amazon.get("payload")[i].get("reportId")
1 change: 1 addition & 0 deletions docs/integrations/sources/amazon-seller-partner.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ This source is capable of syncing the following tables and their data:

| Version | Date | Pull Request | Subject |
|:---------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------|
| `0.2.23` | 2022-06-08 | [\#13604](https://github.com/airbytehq/airbyte/pull/13604) | Add new streams: Fullfiments returns and Settlement reports |
| `0.2.22` | 2022-06-15 | [\#13633](https://github.com/airbytehq/airbyte/pull/13633) | Fix - handle start date for financial stream |
| `0.2.21` | 2022-06-01 | [\#13364](https://github.com/airbytehq/airbyte/pull/13364) | Add financial streams |
| `0.2.20` | 2022-05-30 | [\#13059](https://github.com/airbytehq/airbyte/pull/13059) | Add replication end date to config |
Expand Down

0 comments on commit 00c3ec7

Please sign in to comment.