Skip to content

Commit

Permalink
🎉 Source Shopify: implement BalanceTransactions stream (#10204)
Browse files Browse the repository at this point in the history
  • Loading branch information
harshithmullapudi authored Mar 22, 2022
1 parent 103dd2a commit 9508f61
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@
- name: Shopify
sourceDefinitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerRepository: airbyte/source-shopify
dockerImageTag: 0.1.35
dockerImageTag: 0.1.36
documentationUrl: https://docs.airbyte.io/integrations/sources/shopify
icon: shopify.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7430,7 +7430,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-shopify:0.1.35"
- dockerImage: "airbyte/source-shopify:0.1.36"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/shopify"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-shopify/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ COPY source_shopify ./source_shopify
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.35
LABEL io.airbyte.version=0.1.36
LABEL io.airbyte.name=airbyte/source-shopify
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ tests:
timeout_seconds: 3600
# some streams hold data only for some time, therefore certain streams could be empty while sync.
# 'abandoned_checkouts' stream holds data up to 1 month.
empty_streams: ["abandoned_checkouts"]
empty_streams: ["abandoned_checkouts", "balance_transactions"]
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,8 @@
"orders": {
"updated_at": "2025-03-03T03:47:46-08:00"
}
},
"balance_transactions": {
"id": 9999999999999
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,18 @@
"sync_mode": "incremental",
"cursor_field": ["updated_at"],
"destination_sync_mode": "append"
},
{
"stream": {
"name": "balance_transactions",
"json_schema": {},
"supported_sync_modes": ["incremental", "full_refresh"],
"source_defined_cursor": true,
"default_cursor_field": ["id"]
},
"sync_mode": "incremental",
"cursor_field": ["id"],
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,8 @@
"orders": {
"updated_at": "2022-03-03T03:47:46-08:00"
}
},
"balance_transactions": {
"id": 29427031703741
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"type": ["null", "object"],
"properties": {
"id": {
"type": "integer"
},
"type": {
"type": ["null", "string"]
},
"test": {
"type": ["null", "boolean"]
},
"payout_id": {
"type": ["null", "integer"]
},
"payout_status": {
"type": ["null", "string"]
},
"payoucurrencyt_status": {
"type": ["null", "string"]
},
"amount": {
"type": ["null", "number"]
},
"fee": {
"type": ["null", "number"]
},
"net": {
"type": ["null", "number"]
},
"source_id": {
"type": ["null", "integer"]
},
"source_type": {
"type": ["null", "string"]
},
"source_order_transaction_id": {
"type": ["null", "integer"]
},
"source_order_id": {
"type": ["null", "integer"]
},
"processed_at": {
"type": ["null", "string"],
"format": "date-time"
},
"shop_url": {
"type": ["null", "string"]
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ def __init__(self, config: Dict):
@property
def url_base(self) -> str:
return f"https://{self.config['shop']}.myshopify.com/admin/api/{self.api_version}/"

@property
def default_filter_field_value(self) -> Union[int, str]:
# certain streams are using `since_id` field as `filter_field`, which requires to use `int` type,
# but many other use `str` values for this, we determine what to use based on `filter_field` value
# by default, we use the user defined `Start Date` as initial value, or 0 for `id`-dependent streams.
return 0 if self.filter_field == "since_id" else self.config["start_date"]

@staticmethod
def next_page_token(response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand All @@ -53,7 +60,7 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) ->
params.update(**next_page_token)
else:
params["order"] = f"{self.order_field} asc"
params[self.filter_field] = self.config["start_date"]
params[self.filter_field] = self.default_filter_field_value
return params

@limiter.balance_rate_limit()
Expand Down Expand Up @@ -92,16 +99,16 @@ def state_checkpoint_interval(self) -> int:
cursor_field = "updated_at"

@property
def default_comparison_value(self) -> Union[int, str]:
def default_state_comparison_value(self) -> Union[int, str]:
# certain streams are using `id` field as `cursor_field`, which requires to use `int` type,
# but many other use `str` values for this, we determine what to use based on `cursor_field` value
return 0 if self.cursor_field == "id" else ""

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
return {
self.cursor_field: max(
latest_record.get(self.cursor_field, self.default_comparison_value),
current_stream_state.get(self.cursor_field, self.default_comparison_value),
latest_record.get(self.cursor_field, self.default_state_comparison_value),
current_stream_state.get(self.cursor_field, self.default_state_comparison_value),
)
}

Expand Down Expand Up @@ -307,25 +314,31 @@ class Collects(IncrementalShopifyStream):
The Collect stream is the link between Products and Collections, if the Collection is created for Products,
the `collect` record is created, it's reasonable to Full Refresh all collects. As for Incremental refresh -
we would use the since_id specificaly for this stream.
"""

data_field = "collects"
cursor_field = "id"
order_field = "id"
filter_field = "since_id"

def path(self, **kwargs) -> str:
return f"{self.data_field}.json"

def request_params(
self, stream_state: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
# If there is a next page token then we should only send pagination-related parameters.
if not next_page_token and not stream_state:
params[self.filter_field] = 0
return params

class BalanceTransactions(IncrementalShopifyStream):

"""
PaymentsTransactions stream does not support Incremental Refresh based on datetime fields, only `since_id` is supported:
https://shopify.dev/api/admin-rest/2021-07/resources/transactions
"""

data_field = "transactions"
cursor_field = "id"
order_field = "id"
filter_field = "since_id"

def path(self, **kwargs) -> str:
return f"shopify_payments/balance/{self.data_field}.json"


class OrderRefunds(ShopifySubstream):
Expand Down Expand Up @@ -514,6 +527,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
OrderRisks(config),
TenderTransactions(config),
Transactions(config),
BalanceTransactions(config),
Pages(config),
PriceRules(config),
DiscountCodes(config),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"read_locations": ["Locations"],
"read_inventory": ["InventoryItems", "InventoryLevels"],
"read_merchant_managed_fulfillment_orders": ["FulfillmentOrders"],
"read_shopify_payments_payouts": ["BalanceTransactions"],
}


Expand Down
40 changes: 40 additions & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,45 @@ This source can sync data for the [Shopify API](https://help.shopify.com/en/api/

This Source Connector is based on a [Airbyte CDK](https://docs.airbyte.io/connector-development/cdk-python).

## Troubleshooting

Check out common troubleshooting issues for the BigQuery destination connector on our Discourse [here](https://discuss.airbyte.io/tags/c/connector/11/source-shopify).

### Output schema

This Source is capable of syncing the following core Streams:

* [Abandoned Checkouts](https://help.shopify.com/en/api/reference/orders/abandoned_checkouts)
* [Collects](https://help.shopify.com/en/api/reference/products/collect)
* [Custom Collections](https://help.shopify.com/en/api/reference/products/customcollection)
* [Customers](https://help.shopify.com/en/api/reference/customers)
* [Draft Orders](https://help.shopify.com/en/api/reference/orders/draftorder)
* [Discount Codes](https://shopify.dev/docs/admin-api/rest/reference/discounts/discountcode)
* [Metafields](https://help.shopify.com/en/api/reference/metafield)
* [Orders](https://help.shopify.com/en/api/reference/orders)
* [Orders Refunds](https://shopify.dev/api/admin/rest/reference/orders/refund)
* [Orders Risks](https://shopify.dev/api/admin/rest/reference/orders/order-risk)
* [Products](https://help.shopify.com/en/api/reference/products)
* [Transactions](https://help.shopify.com/en/api/reference/orders/transaction)
* [Balance Transactions](https://shopify.dev/api/admin-rest/2021-07/resources/transactions)
* [Pages](https://help.shopify.com/en/api/reference/online-store/page)
* [Price Rules](https://help.shopify.com/en/api/reference/discounts/pricerule)
* [Locations](https://shopify.dev/api/admin-rest/2021-10/resources/location)
* [InventoryItems](https://shopify.dev/api/admin-rest/2021-10/resources/inventoryItem)
* [InventoryLevels](https://shopify.dev/api/admin-rest/2021-10/resources/inventorylevel)
* [Fulfillment Orders](https://shopify.dev/api/admin-rest/2021-07/resources/fulfillmentorder)
* [Fulfillments](https://shopify.dev/api/admin-rest/2021-07/resources/fulfillment)
* [Shop](https://shopify.dev/api/admin-rest/2021-07/resources/shop)

#### NOTE:

For better experience with `Incremental Refresh` the following is recommended:

* `Order Refunds`, `Order Risks`, `Transactions` should be synced along with `Orders` stream.
* `Discount Codes` should be synced along with `Price Rules` stream.

If child streams are synced alone from the parent stream - the full sync will take place, and the records are filtered out afterwards.

### Data type mapping

| Integration Type | Airbyte Type |
Expand Down Expand Up @@ -100,6 +139,7 @@ This is expected when the connector hits the 429 - Rate Limit Exceeded HTTP Erro

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.36 | 2022-03-22 | [9850](https://github.com/airbytehq/airbyte/pull/9850) | Added `BalanceTransactions` stream |
| 0.1.35 | 2022-03-07 | [10915](https://github.com/airbytehq/airbyte/pull/10915) | Fix a bug which caused `full-refresh` syncs of child REST entities configured for `incremental` |
| 0.1.34 | 2022-03-02 | [10794](https://github.com/airbytehq/airbyte/pull/10794) | Minor specification re-order, fixed links in documentation |
| 0.1.33 | 2022-02-17 | [10419](https://github.com/airbytehq/airbyte/pull/10419) | Fixed wrong field type for tax_exemptions for `Abandoned_checkouts` stream |
Expand Down

0 comments on commit 9508f61

Please sign in to comment.