forked from aquasecurity/saas-api-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrealtime_scan.py
85 lines (65 loc) · 2.09 KB
/
realtime_scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# This Python script will submit a POST to the
# realtime scan API endpoint to initiate a new
# scan. It then polls the endpoint for results.
import sys
import json
import time
import hmac
import base64
import hashlib
import requests
# Obtain a CloudSploit API key and secret from the dashboard
api_key = "replace-with-key"
secret = "replace-with-secret"
key_id = 1234 # The key_id you want to scan (obtain this via GET /keys)
base_url = "https://api.cloudsploit.com"
interval = 5 # Decreasing this value will result in being rate limited
def make_call(method, path, body):
timestamp = str(int(time.time() * 1000))
endpoint = base_url + path;
if body:
body_str = json.dumps(body, separators=(',', ':'))
else:
body_str = ""
string = timestamp + method + path + body_str
signature = hmac.new(secret, msg=string, digestmod=hashlib.sha256).hexdigest()
hdr = {
"Accept": "application/json",
"X-API-Key": api_key,
"X-Signature": signature,
"X-Timestamp": timestamp,
"content-type": "application/json"
}
# print method + " " + endpoint
if method is "POST":
r=requests.post(endpoint, headers=hdr, data=body_str);
else:
r=requests.get(endpoint, headers=hdr);
return r.text
body = {
"key_id": key_id
}
realtime_scan = make_call("POST", "/v2/realtimes", body)
if realtime_scan is None:
print "Error initiating realtime scan: " + realtime_scan
sys.exit()
realtime_json = json.loads(realtime_scan)
if "errors" in realtime_json:
print(json.dumps(realtime_json, indent=4, separators=(',', ': ')))
sys.exit()
realtime_id = realtime_json["data"]["realtime_id"]
if realtime_id is None:
print "Error obtaining realtime ID"
sys.exit()
print "Scan created with realtime ID: " + str(realtime_id)
while True:
print "Polling for results..."
realtime_result = make_call("GET", "/v2/realtimes/" + str(realtime_id), None)
realtime_result_json = json.loads(realtime_result)
realtime_status = realtime_result_json["data"]["status"]
if realtime_status != "COMPLETE":
print realtime_result
time.sleep(interval)
else:
print(json.dumps(realtime_result_json, indent=4, separators=(',', ': ')))
sys.exit()