-
Notifications
You must be signed in to change notification settings - Fork 35
/
f5-waf-quick-patch-cve-2021-44228.py
162 lines (128 loc) · 6.11 KB
/
f5-waf-quick-patch-cve-2021-44228.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# F5 WAF - quick patch for CVE-2021-44228
# This script creates a custom signature set with the relevant signatures for the CVE, apply to all policies in blocking mode and enforce the signatures as well apply
# the changes to the policies
# TODO: implement general error handling
# WARNING: this might increase control plane CPU and memory usage
# Ismael Goncalves
import argparse
import csv
import datetime
import getpass
import json
import os
import requests
import urllib3
# signatures ID relevant to the CVE
# CVE-2021-4228
# https://support.f5.com/csp/article/K19026212
sigs = ['200104768', '200104769', '200004450', '200004451','200004474','200104770','200104771','200104772','200104773']
sig_set = 'CVE-2021-44228'
def get_token(b, url_base, creds):
url_auth = '%s/shared/authn/login' % url_base
try:
payload = {}
payload['username'] = creds[0]
payload['password'] = creds[1]
payload['loginProviderName'] = 'tmos'
token = b.post(url_auth, json.dumps(payload)).json()['token']['token']
except:
token = ''
return token
def audit_asm_policies_high_level(device):
print('Working on ASM policies for device %s - only policies in BLOCKING mode' % device)
# filter policies - obtains policy ID, name, enforcement mode
url_base_asm = 'https://%s/mgmt/tm/asm/policies/?$select=id,name,enforcementMode,type' % device
r = bigip.get(url_base_asm)
json_data = r.json()
for i in json_data['items']:
if( i['type']=='parent'):
continue
# operation for policies in blocking mode only
if(i['enforcementMode']=='blocking'):
print('Enforcing selected signatures on device %s policy %s' % (device,i['name']))
apply_signature_set(i['id'])
def apply_signature_set(policy_id):
url_apply_pol = 'https://%s/mgmt/tm/asm/tasks/apply-policy' % (device)
payload_apply = '{"policyReference":{"link":"https://localhost/mgmt/tm/asm/policies/%s"}}' % (policy_id)
payload_enfor = '{"performStaging":false}'
url_policy = 'https://%s/mgmt/tm/asm/policies/%s/signature-sets' % (device,policy_id)
# assign new signature set to policies in blocking mode
payload_sig_set = '{"signatureSetReference":{"link":"https://localhost/mgmt/tm/asm/signature-sets/%s"}}' % (sig_set_id)
print(payload_sig_set)
r = bigip.post(url_policy,payload_sig_set)
print(r.text)
# enforce each new signature
for s in sigs:
url_cve_sigs = 'https://%s/mgmt/tm/asm/policies/%s/signatures?$expand=signatureReference&$filter=inPolicy+eq+true+and+signature/signatureId+eq+\'%s\'' % (device,policy_id,s)
r = bigip.patch(url_cve_sigs,payload_enfor)
print("Status code for enforcement: " + str(r.status_code))
# apply changes to the policy
r = bigip.post(url_apply_pol, payload_apply)
print("Status code for apply changes: " + str(r.status_code))
def create_signature_set(device):
# obtain signature hash (id) info about the relevant signatures for the CVE
sig_ids = []
for s in sigs:
url_sig = 'https://%s/mgmt/tm/asm/signatures?$filter=signatureId+eq+%s' % (device,s)
r = bigip.get(url_sig)
json_data = r.json()
for i in json_data['items']:
sig_ids.append(i['id'])
print(sig_ids)
# create custom set with a single signature - TODO: a JSON object can be created and an array of signatures added
# however this was not done this time :(
url_sig_set = 'https://%s/mgmt/tm/asm/signature-sets' % device
in_payload = ""
# build signatures references
for s in sig_ids:
in_payload = in_payload + '{"link":"https://localhost/mgmt/tm/asm/signatures/%s"},' % s
in_payload = in_payload.rstrip(",")
payload_sig_set = '{"name":"%s","category":"User-defined","type":"manual","signatureReferences":[%s]}' % (sig_set,in_payload)
r = bigip.post(url_sig_set, payload_sig_set)
sig_set_id = r.json()
return sig_set_id['id']
def check_active(device):
# obtain device name
url_base_asm = 'https://%s/mgmt/tm/sys/global-settings?$select=hostname' % device
r = bigip.get(url_base_asm)
hostname = r.json()['hostname']
url_base_asm = 'https://%s/mgmt/tm/cm/traffic-group/traffic-group-1/stats?$select=deviceName,failoverState' % device
r = bigip.get(url_base_asm)
json_data = r.json()
for i in json_data['entries']:
devices = json_data['entries'][i]['nestedStats']
# returns similar to
#{'entries': {'deviceName': {'description': '/Common/bigip1.f5labs.net'}, 'failoverState': {'description': 'standby'}}}
device = devices['entries']['deviceName']['description']
state = devices['entries']['failoverState']['description']
if (hostname in device) and ('active' in state):
return True
return False
if __name__ == "__main__":
urllib3.disable_warnings()
parser = argparse.ArgumentParser()
parser.add_argument("device", help='BIG-IP devices list separated by line')
args = vars(parser.parse_args())
device = args['device']
username = input('Enter your username: ')
password = getpass.getpass('Enter your password')
with open(device,'r') as a_file:
for line in a_file:
device = line.strip()
# TODO - test connectivity with each device and report on the ones failing
url_base = 'https://%s/mgmt' % device
bigip = requests.session()
bigip.headers.update({'Content-Type': 'application/json'})
bigip.auth = (username, password)
bigip.verify = False
token = get_token(bigip, url_base, (username, password))
if (not token):
print('Unable to obtain token for device ' + device)
continue
bigip.headers.update({'X-F5-Auth-Token': token})
bigip.auth = None
if not check_active(device):
print('Device ' + device + ' is not active, skipping it...')
continue
sig_set_id = create_signature_set(device)
audit_asm_policies_high_level(device)