-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathiamdump.py
119 lines (88 loc) · 3.2 KB
/
iamdump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import json
import os
import socketserver
import sys
import threading
service_map = {
"applicationautoscaling": "application-autoscaling",
"cloudwatchevents": "events",
"cloudwatchlogs": "logs",
"elasticloadbalancingv2": "elasticloadbalancing",
"elasticsearchservice": "es",
}
action_map = {
"s3:GetBucketAccelerateConfiguration": "s3:GetAccelerateConfiguration",
"s3:GetBucketEncryption": "s3:GetEncryptionConfiguration",
"s3:GetBucketLifecycleConfiguration": "s3:GetLifecycleConfiguration",
"s3:GetBucketReplication": "s3:GetReplicationConfiguration",
"s3:GetObjectLockConfiguration": "s3:GetBucketObjectLockConfiguration",
"s3:HeadObject": "s3:GetObject",
"s3:ListObjects": "s3:ListBucket",
}
def translate_service(service):
"""
Translates a service string from the JSON message
into a valid IAM service string.
"""
service = service.lower().replace(" ", "")
if service in service_map:
service = service_map[service]
return service
class MetricsHandler(socketserver.BaseRequestHandler):
"""
Receives JSON messages from AWS SDKs, extracts the API call
made by the SDK, and adds it to the global "api_calls" set.
"""
api_calls = set()
def handle(self):
data = json.loads(self.request[0].strip())
api_call = (data["Service"], data["Api"])
self.__class__.api_calls.add(api_call)
@classmethod
def iam_policy_json(cls):
actions = []
for service, api in cls.api_calls:
if not service:
continue
service = service.lower().replace(" ", "")
if service in service_map:
service = service_map[service]
action = "{}:{}".format(service, api)
if action in action_map:
action = action_map[action]
if action == "ec2metadata:GetMetadata":
continue
actions.append(action)
return json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{"Effect": "Allow", "Action": sorted(actions), "Resource": "*"}
],
},
indent=2,
)
def cli():
command = " ".join(sys.argv[1:])
if not command:
script = os.path.basename(sys.argv[0])
sys.stderr.write("Usage: {} <command>\n".format(script))
sys.exit(1)
# Open a socket on port 0 to let the kernal assign a port.
with socketserver.UDPServer(("localhost", 0), MetricsHandler) as server:
# Run the server in a background thread.
threading.Thread(target=server.serve_forever).start()
# Tell AWS SDKs to send metrics to this server.
os.environ["AWS_CSM_ENABLED"] = "true"
os.environ["AWS_CSM_PORT"] = str(server.server_address[1])
# Run the command that was passed in as arguments,
# and stop the server after the command has finished.
exit_code = os.system(command)
server.shutdown()
if MetricsHandler.api_calls:
sys.stderr.write("\n")
sys.stderr.write(MetricsHandler.iam_policy_json())
sys.stderr.write("\n")
sys.exit(exit_code)
if __name__ == "__main__":
cli()