-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🎉 Source Amazon Ads : Add attribution reports #16342
Changes from 22 commits
8ac8cee
e8998ed
4d86c53
57c3808
d12a9bd
e213fc4
3634a83
08dfd90
4a6f9e5
1b4b833
91d0682
8764d9f
108b81c
02c5b61
1b817b8
7744ca4
26e1694
d65e7dc
e96b497
75baea2
7d7cd06
2abdeab
f0a78f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from typing import List | ||
|
||
from .common import CatalogModel | ||
|
||
|
||
class Report(CatalogModel): | ||
date: str | ||
brandName: str | ||
marketplace: str | ||
campaignId: str | ||
productAsin: str | ||
productConversionType: str | ||
advertiserName: str | ||
adGroupId: str | ||
creativeId: str | ||
productName: str | ||
productCategory: str | ||
productSubcategory: str | ||
productGroup: str | ||
publisher: str | ||
|
||
|
||
class AttributionReportModel(CatalogModel): | ||
reports: List[Report] | ||
size: int | ||
cursorId: str |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
# | ||
# Copyright (c) 2022 Airbyte, Inc., all rights reserved. | ||
# | ||
|
||
from typing import Any, Iterable, Mapping, MutableMapping, Optional | ||
|
||
import pendulum | ||
import requests | ||
from source_amazon_ads.schemas import AttributionReportModel, Profile | ||
from source_amazon_ads.streams.common import AmazonAdsStream | ||
|
||
BRAND_REFERRAL_BONUS = "brb_bonus_amount" | ||
|
||
METRICS_MAP = { | ||
"PERFORMANCE": [ | ||
"Click-throughs", | ||
"attributedDetailPageViewsClicks14d", | ||
"attributedAddToCartClicks14d", | ||
"attributedPurchases14d", | ||
"unitsSold14d", | ||
"attributedSales14d", | ||
"attributedTotalDetailPageViewsClicks14d", | ||
"attributedTotalAddToCartClicks14d", | ||
"attributedTotalPurchases14d", | ||
"totalUnitsSold14d", | ||
"totalAttributedSales14d", | ||
], | ||
"PRODUCTS": [ | ||
"attributedDetailPageViewsClicks14d", | ||
"attributedAddToCartClicks14d", | ||
"attributedPurchases14d", | ||
"unitsSold14d", | ||
"attributedSales14d", | ||
"brandHaloDetailPageViewsClicks14d", | ||
"brandHaloAttributedAddToCartClicks14d", | ||
"brandHaloAttributedPurchases14d", | ||
"brandHaloUnitsSold14d", | ||
"brandHaloAttributedSales14d", | ||
"attributedNewToBrandPurchases14d", | ||
"attributedNewToBrandUnitsSold14d", | ||
"attributedNewToBrandSales14d", | ||
"brandHaloNewToBrandPurchases14d", | ||
"brandHaloNewToBrandUnitsSold14d", | ||
"brandHaloNewToBrandSales14d", | ||
], | ||
} | ||
|
||
|
||
class AttributionReport(AmazonAdsStream): | ||
""" | ||
This stream corresponds to Amazon Advertising API - Attribution Reports | ||
https://advertising.amazon.com/API/docs/en-us/amazon-attribution-prod-3p/#/ | ||
""" | ||
|
||
model = AttributionReportModel | ||
primary_key = None | ||
data_field = "reports" | ||
page_size = 300 | ||
|
||
report_type = "" | ||
metrics = "" | ||
group_by = "" | ||
|
||
_next_page_token_field = "cursorId" | ||
_current_profile_id = "" | ||
|
||
REPORT_DATE_FORMAT = "YYYYMMDD" | ||
CONFIG_DATE_FORMAT = "YYYY-MM-DD" | ||
REPORTING_PERIOD = 90 | ||
|
||
def __init__(self, config: Mapping[str, Any], *args, **kwargs): | ||
self._start_date = config.get("start_date") | ||
self._req_start_date = "" | ||
self._req_end_date = "" | ||
|
||
super().__init__(config, *args, **kwargs) | ||
|
||
def _set_dates(self, profile: Profile): | ||
new_start_date = pendulum.now(tz=profile.timezone).subtract(days=1).date() | ||
new_end_date = pendulum.now(tz=profile.timezone).date() | ||
|
||
if self._start_date: | ||
new_start_date = max(self._start_date, new_end_date.subtract(days=self.REPORTING_PERIOD)) | ||
|
||
self._req_start_date = new_start_date.format(self.REPORT_DATE_FORMAT) | ||
self._req_end_date = new_end_date.format(self.REPORT_DATE_FORMAT) | ||
|
||
@property | ||
def http_method(self) -> str: | ||
return "POST" | ||
|
||
def read_records(self, *args, **kvargs) -> Iterable[Mapping[str, Any]]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I finally solved my local issue by adding a few profile Ids, though our dev account doesn't have any attribute reports. So, earlier, under the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. current implementation
if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you do me a favor? Can you unset the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No error when When there are no user provided profiles, API call is made to fetch all profiles. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the feedback, that's really helpful. The acceptance test is not passed. And the reason is that not all profiles are authorized to fetch reports, and the read command doesn't fail gracefully, which blocks the read on other streams. I read the codes again, looks like a profile is not independent of the others. In our dev test account, there are about 10 profiles fetched from the API, and only 4 of them are authorized to read the attribute reports. If I pass just these 4 profiles, the tests are good. But if I include another, the read command fails on that profile and stops fetching reports on other profiles. And under the Similarly, if I set a collection of profiles to Under a You may consider to use a stream slice to store profiles, and fetch reports for each profile in that slice. The GitHub connector provides a good example in fetching users in an org. Let me know if you have a question. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you help me in providing a way to reproduce the error? I tried running this command
It's returning 0 records for all the streams except
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, be happy to! firstly, check
looks like you have 3 profiles. I guess all 3 profiles are authorized to read attribute_reports_*. Can you create another profile that is not authorized and add it to the array of profiles in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any ideas on why I am not getting any error with fake profile IDs in
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure, maybe on the api side, the validation handles it before the authorization module is run? |
||
""" | ||
Iterate through self._profiles list and send read all records for each profile. | ||
""" | ||
for profile in self._profiles: | ||
try: | ||
self._set_dates(profile) | ||
self._current_profile_id = profile.profileId | ||
yield from super().read_records(*args, **kvargs) | ||
except Exception as err: | ||
self.logger.info("some error occurred: %s", err) | ||
|
||
def request_headers(self, *args, **kvargs) -> MutableMapping[str, Any]: | ||
headers = super().request_headers(*args, **kvargs) | ||
headers["Amazon-Advertising-API-Scope"] = str(self._current_profile_id) | ||
return headers | ||
|
||
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: | ||
stream_data = response.json() | ||
next_page_token = stream_data.get(self._next_page_token_field) | ||
if next_page_token: | ||
return {self._next_page_token_field: next_page_token} | ||
|
||
def path(self, **kvargs) -> str: | ||
return "/attribution/report" | ||
|
||
def request_body_json( | ||
self, | ||
stream_state: Mapping[str, Any], | ||
stream_slice: Mapping[str, Any] = None, | ||
next_page_token: Mapping[str, Any] = None, | ||
) -> Optional[Mapping]: | ||
body = { | ||
"reportType": self.report_type, | ||
"count": self.page_size, | ||
"metrics": self.metrics, | ||
"startDate": self._req_start_date, | ||
"endDate": self._req_end_date, | ||
self._next_page_token_field: "", | ||
} | ||
|
||
if self.group_by: | ||
body["groupBy"] = self.group_by | ||
|
||
if next_page_token: | ||
body[self._next_page_token_field] = next_page_token[self._next_page_token_field] | ||
|
||
return body | ||
|
||
|
||
class AttributionReportProducts(AttributionReport): | ||
report_type = "PRODUCTS" | ||
|
||
metrics = ",".join(METRICS_MAP[report_type]) | ||
|
||
group_by = "" | ||
|
||
|
||
class AttributionReportPerformanceCreative(AttributionReport): | ||
report_type = "PERFORMANCE" | ||
|
||
metrics = ",".join(METRICS_MAP[report_type]) | ||
|
||
group_by = "CREATIVE" | ||
|
||
|
||
class AttributionReportPerformanceAdgroup(AttributionReport): | ||
report_type = "PERFORMANCE" | ||
|
||
metrics_list = METRICS_MAP[report_type] | ||
metrics_list.append(BRAND_REFERRAL_BONUS) | ||
metrics = ",".join(metrics_list) | ||
|
||
group_by = "ADGROUP" | ||
|
||
|
||
class AttributionReportPerformanceCampaign(AttributionReport): | ||
report_type = "PERFORMANCE" | ||
|
||
metrics_list = METRICS_MAP[report_type] | ||
metrics_list.append(BRAND_REFERRAL_BONUS) | ||
metrics = ",".join(metrics_list) | ||
|
||
group_by = "CAMPAIGN" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this streams: sponsored_product_keywords, sponsored_product_negative_keywords, sponsored_product_targetings
added as empty ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it comes from me, irrelevant to the topic branch. It was about 2 weeks ago, the integration test says these streams are empty in the test account when I run it locally. And I have the login credential, but I can't access the OTP token, so I can't login.