-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy paths3_enable_logging.py
117 lines (98 loc) · 4.55 KB
/
s3_enable_logging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
'''
s3_enable_logging
What it does: Turns on server access logging. The target bucket needs to be in the same region as the remediation bucket or it'll throw a CrossLocationLoggingProhibitted error. This bot will create a bucket to log to as well.
Usage: AUTO: s3_enable_logging
Limitations: none
'''
import boto3
import json
from botocore.exceptions import ClientError
## Turn on S3 bucket logging
def run_action(boto_session, rule, entity, params):
bucket_name = entity['id']
accountNumber = entity['accountNumber']
region = entity['region'].replace("_", "-")
text_output = ""
s3_client = boto_session.client('s3')
s3_resource = boto_session.resource('s3')
bucket_logging = s3_resource.BucketLogging(bucket_name)
target_bucket_name = accountNumber + "s3accesslogs" + region
# The target bucket needs to be in the same region as the remediation bucket or it'll throw a CrossLocationLoggingProhibitted error.
try:
# Check if the bucket exists. If not, create one
s3_client.head_bucket(Bucket=target_bucket_name) # Check if the bucket exists. If not, create one
except ClientError:
# The bucket does not exist or you have no access. Create it
try:
if region == "us-east-1":
result = s3_client.create_bucket(
Bucket=target_bucket_name,
)
elif region == "eu-west-1":
region = "EU"
result = s3_client.create_bucket(
Bucket=target_bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
else:
result = s3_client.create_bucket(
Bucket=target_bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
responseCode = result['ResponseMetadata']['HTTPStatusCode']
if responseCode >= 400:
text_output = "Unexpected error: %s \n" % str(result)
else:
text_output = "Logging bucket created %s \n" % target_bucket_name
except ClientError as e:
text_output = "Unexpected error: %s \n" % e
try:
result = bucket_logging.put(
BucketLoggingStatus={
'LoggingEnabled': {
'TargetBucket': target_bucket_name,
'TargetPrefix': ''
}
}
)
responseCode = result['ResponseMetadata']['HTTPStatusCode']
if responseCode >= 400:
text_output = text_output + "Unexpected error enable logging: %s \n" % str(result)
raise Exception(text_output);
loggingPolicy = {
"Effect": "Allow",
"Principal": {
"Service": "logging.s3.amazonaws.com"
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::" + target_bucket_name + "/*",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:::" + bucket_name}
}
}
try:
existing_policy = s3_client.get_bucket_policy(Bucket=target_bucket_name)
existing_policy_json = json.loads(existing_policy['Policy'])
existing_policy_json['Statement'].append(loggingPolicy)
resultUpdatePolicy = s3_resource.Bucket(target_bucket_name).Policy().put(
Policy=json.dumps(existing_policy_json))
responseCodeUpdatePolicy = resultUpdatePolicy['ResponseMetadata']['HTTPStatusCode']
if responseCodeUpdatePolicy >= 400:
text_output = text_output + "Unexpected error Update Policy: %s \n" % str(result)
raise Exception(text_output)
except s3_client.exceptions.from_code('NoSuchBucketPolicy'):
policy = {
"Version": "2012-10-17",
"Statement": [loggingPolicy]}
resultUpdatePolicy = s3_resource.Bucket(target_bucket_name).Policy().put(
Policy=json.dumps(policy))
responseCodeUpdatePolicy = resultUpdatePolicy['ResponseMetadata']['HTTPStatusCode']
if responseCodeUpdatePolicy >= 400:
text_output = text_output + "Unexpected error Update Policy: %s \n" % str(result)
raise Exception(text_output)
text_output = text_output + "Bucket logging enabled from bucket: %s to bucket: %s \n" % (
bucket_name, target_bucket_name)
except ClientError as e:
text_output = text_output + "Unexpected error: %s \n" % e
return text_output