-
Notifications
You must be signed in to change notification settings - Fork 16
/
clean_discovered_docker.py
79 lines (65 loc) · 2.72 KB
/
clean_discovered_docker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""Script used to clean docker images older than X days and not discovered"""
import os
from datetime import datetime
from configparser import ConfigParser
from cbw_api_toolbox.cbw_api import CBWApi
# Change to True to delete servers
DELETE = False
# Delete if creation date older than X days
DELETE_AFTER = 7
def connect_api():
'''Connect to the API and test connection'''
conf = ConfigParser()
conf.read(os.path.join(os.path.abspath(
os.path.dirname(__file__)), '..', 'api.conf'))
client = CBWApi(conf.get('cyberwatch', 'url'), conf.get(
'cyberwatch', 'api_key'), conf.get('cyberwatch', 'secret_key'))
client.ping()
return client
def find_dockers(client):
'''Find docker servers and return server_id + creation date'''
dockers_details = []
all_dockers = client.docker_images()
for docker in all_dockers:
if docker.server_id:
detail = client.asset(str(docker.server_id))
dockers_details.append(
{"server_id": str(docker.server_id), "created_at": detail.created_at})
return dockers_details
def find_discoveries(client):
'''Find discoveries with docker images'''
ids = []
discoveries_details = client.hosts()
for host in discoveries_details:
if host.discovery.type == "CbwAssets::Discovery::DockerRegistry":
for id in host.server_ids:
ids.append(str(id))
return ids
def find_to_clean(dockers, discoveries):
'''Find docker images to delete'''
to_clean = []
for docker in dockers:
if docker['server_id'] not in discoveries:
date = datetime.fromisoformat(
docker['created_at']).replace(tzinfo=None)
time = datetime.now() - date
if time.days > DELETE_AFTER:
to_clean.append(docker)
return to_clean
def display_and_delete(delete_list, what, client, delete=DELETE):
'''Display servers then delete them'''
print('\n\n================= Total of {} {} to delete (delete={}) ================='.format(len(delete_list),
what,
delete))
for delete_server in delete_list:
print('{} --- {}'.format(delete_server['server_id'], delete_server['created_at']))
if delete is True:
client.delete_server(str(delete_server['server_id']))
def launch_script():
'''Launch script'''
client = connect_api()
dockers = find_dockers(client)
discoveries = find_discoveries(client)
clean = find_to_clean(dockers, discoveries)
display_and_delete(clean, 'docker image', client)
launch_script()