-
Notifications
You must be signed in to change notification settings - Fork 0
/
Bulk_API_Report.py
139 lines (106 loc) · 3.55 KB
/
Bulk_API_Report.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python3
import argparse
import csv
import dataclasses
import json
import requests
import os
########### DESERIALIZERS ###########
class Error(Exception):
def __init__(self, error=None, error_description=None):
super().__init__(error_description)
self.errors = error
@dataclasses.dataclass
class Oauth2:
id: str
access_token: str
instance_url: str
issued_at: str
signature: str
token_type: str
@property
def buildAuthHeader(self):
return {
"Authorization": f"{self.token_type} {self.access_token}"
}
@dataclasses.dataclass
class Jobs:
done: str
nextRecordsUrl: str
records: list
########### END DESERIALIZERS ###########
ENDPOINTS = {
"login": "/services/oauth2/token",
"jobs": "/services/data/v47.0/jobs/ingest"
}
URLS = {
"prod": "https://login.salesforce.com",
"dev": "https://test.salesforce.com"
}
def login(host: str, endpoint: str, creds: dict) -> Oauth2:
print(f"[*] Retrieving access token from {host + endpoint}")
resp = requests.post(f"{host + endpoint}", params=creds)
if resp.status_code != 200:
raise Error(**resp.json())
print(f"[!] Token received")
return Oauth2(**resp.json())
def retrieve_all_bulk_api_jobs(host: str, endpoint: str, oauth2: Oauth2) -> list:
print(f"[*] Retrieving bulk API jobs information from {host + endpoint}")
resp = requests.get(host + endpoint, headers=oauth2.buildAuthHeader)
if resp.status_code != 200:
resp.raise_for_status()
jobs = Jobs(**resp.json())
records = jobs.records
if jobs.nextRecordsUrl is not None:
records.extend(
retrieve_all_bulk_api_jobs(host, jobs.nextRecordsUrl, oauth2)
)
return records
def response_to_csv(json_data: dict) -> None:
csv_headers = set().union(*(row.keys() for row in json_data))
with open("Bulk API Report.csv", 'w', newline='') as csvFile:
writer = csv.DictWriter(csvFile, fieldnames=csv_headers)
writer.writeheader()
writer.writerows(json_data)
if __name__ == "__main__":
oauth2_response = None
if os.path.exists("oauth_response.json"):
with open("oauth_response.json") as file:
oauth2_response = Oauth2(**json.load(file))
else:
parser = argparse.ArgumentParser(description="Get Bulk API report")
parser.add_argument(
"--creds-file",
dest="creds",
action="store",
required=True,
help="Json file with user credentials and connected app client id and client secret."
)
parser.add_argument(
"--env",
"--environment",
dest="env",
action="store",
choices=['prod', 'dev'],
required=True,
help="Environment where to pull Bulk API Report"
)
args = parser.parse_args()
with open(args.creds) as file:
oauth2_response = login(
URLS['prod'],
ENDPOINTS["login"],
json.load(file)
)
with open('oauth_response.json', 'w') as output:
json.dump(oauth2_response.__dict__, output)
records = retrieve_all_bulk_api_jobs(
oauth2_response.instance_url,
ENDPOINTS["jobs"],
oauth2_response
)
print(f"[*] Dumping {len(records)} rows to Bulk_API_Jobs.json")
with open(f"Bulk_API_Jobs.json", 'w') as output_file:
json.dump(records, output_file)
print("[*] Dumping Response to a CSV file.")
response_to_csv(records)