-
Notifications
You must be signed in to change notification settings - Fork 0
/
transformer.py
242 lines (177 loc) · 8.28 KB
/
transformer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import pandas as pd
from custom_utils import Utils
import datetime
class Transformer():
def __init__(self, ld_data=None, save=False):
self.teams_source, self.roles_source, self.members_source = ld_data.values()
self.roles_df = None
self.policies = {}
self.teams_df = None
self.members_df = None
self.summary_metrics = {}
self.save = save
def process(self, output_dir=None):
self._prep_roles()
self._prep_members()
self._prep_teams()
self._update_members_assigned_roles()
self._update_teams_assigned_roles()
self._generate_summary_metrics()
if self.save == True:
self._save_data(output_dir)
def convert_role_id_to_key(self, arr_lookup):
role_id_map = {item["_id"]: item["key"] for item in self.roles_source}
result = [role_id_map.get(_id, None)
for _id in arr_lookup if _id in role_id_map]
return [key for key in result if key is not None]
def _generate_summary_metrics(self):
user_assigned_custom_roles = [role for sublist in self.members_df['customRoles']
for role in sublist]
team_assigned_custom_roles = [role for sublist in self.teams_df['customRoleKeys']
for role in sublist]
total_custom_role = len(self.roles_df)
distict_user_assigned_custom_roles = len(
set(user_assigned_custom_roles))
distict_team_assigned_custom_roles = len(
set(team_assigned_custom_roles))
distinct_combined_roles = set(
user_assigned_custom_roles + team_assigned_custom_roles)
total_assigned_distinct_role = len(distinct_combined_roles)
orphaned_roles = total_custom_role - len(distinct_combined_roles)
total_users = len(self.members_df)
total_custom_role = len(self.roles_df)
total_teams = len(self.teams_df)
total_members_custom_roles_count = self.members_df['customRoles_count'].sum(
)
total_users_custom_roles_count = self.teams_df['customRoleKeys_count'].sum(
)
total_permissions_count = self.roles_df['permission_count'].sum()
self.summary_metrics = {
'role_to_user_ratio': total_members_custom_roles_count / total_users,
'role_to_team_ratio': total_users_custom_roles_count / total_teams,
'total_custom_role': total_custom_role,
'orphaned_roles': orphaned_roles,
'permission_to_role_ratio': total_permissions_count / total_custom_role,
'user_assigned_custom_roles': user_assigned_custom_roles,
'team_assigned_custom_roles': team_assigned_custom_roles,
'total_assigned_roles': total_assigned_distinct_role,
'distict_user_assigned_custom_roles': distict_user_assigned_custom_roles,
'distict_team_assigned_custom_roles': distict_team_assigned_custom_roles,
}
return self.summary_metrics
def _update_members_assigned_roles(self):
member_lookup = {m["_id"]: m["customRoles"]
for m in self.members_df.to_dict(orient="records")}
role_lookup = {
r["key"]: r for r in self.roles_df.to_dict(orient="records")}
for role_key, role_data in role_lookup.items():
for member_id, member_roles in member_lookup.items():
if "members" not in role_data:
role_data['members'] = []
role_data['members_count'] = 0
if role_key in member_roles:
role_data["members"].append(member_id)
role_data['members_count'] += 1
self.roles_df = pd.DataFrame(role_lookup.values())
def _update_teams_assigned_roles(self):
team_lookup = {t["key"]: t["customRoleKeys"]
for t in self.teams_df.to_dict(orient="records")}
role_lookup = {
r["key"]: r for r in self.roles_df.to_dict(orient="records")}
for role_key, role_data in role_lookup.items():
for team_key, team_data in team_lookup.items():
if "teams" not in role_data:
role_data['teams'] = []
role_data['teams_count'] = 0
if role_key in team_data:
role_data["teams"].append(team_key)
role_data['teams_count'] += 1
self.roles_df = pd.DataFrame(role_lookup.values())
def _prep_roles(self):
roles = []
for iter in self.roles_source:
_id = iter.get('_id')
policy = iter['policy']
self.policies[_id] = policy
# count numbe rof permission statements in the policy
iter['permission_count'] = len(policy)
roles.append(iter)
self.roles_df = pd.DataFrame(roles)
def _days_from_today(self, unix_time_ms):
today = datetime.datetime.today()
input_dte = datetime.datetime.fromtimestamp(unix_time_ms / 1000)
return (today - input_dte).days
def _prep_members_item(self, iter):
iter['quickstartStatus'] = ""
iter['hasPermissionGrants'] = False
iter['isTeamMaintainer'] = False
iter['customRoles_count'] = 0
iter['hasCustomRoles'] = False
iter['isTeamMember'] = False
iter['team_list'] = []
iter['teams_count'] = 0
if iter.get('permissionGrants') is not None and len(iter.get('permissionGrants')) > 0:
iter['isTeamMaintainer'] = True
iter['hasPermissionGrants'] = True
iter['customRoles'] = self.convert_role_id_to_key(iter['customRoles'])
iter['customRoles_count'] = len(iter['customRoles'])
iter['hasCustomRoles'] = iter['customRoles_count'] > 0
if iter.get('teams') is not None and len(iter.get('teams')) > 0:
iter['isTeamMember'] = True
teams = iter['teams']
iter['teams_count'] = len(teams)
for team in teams:
iter['team_list'].append(team['key'])
if iter['_lastSeen'] is None or iter['_lastSeen'] < iter['creationDate']:
iter['_lastSeen'] = iter['creationDate']
iter['days_since_last_seen'] = self._days_from_today(iter['_lastSeen'])
def _prep_members(self):
members = []
for iter in self.members_source:
self._prep_members_item(iter)
members.append(iter)
self.members_df = pd.DataFrame(members)
def _prep_teams(self):
teams = []
for iter in self.teams_source:
_id = iter.get('_id')
iter['customRoleKeys_count'] = len(iter['customRoleKeys'])
teams.append(iter)
self.teams_df = pd.DataFrame(teams)
def get_summary_metrics(self):
return self.summary_metrics
def get_members_df(self):
return self.members_df
def get_roles_df(self):
return self.roles_df
def get_teams_df(self):
return self.teams_df
def get_policies(self) -> dict:
return self.policies
def _save_data(self, msg=None, output_dir='output'):
prefix = "transformed-"
Utils.save_data_to_file(
self.roles_df.to_dict(orient="records"), f"{output_dir}/{prefix}roles.json")
Utils.save_data_to_file(
self.teams_df.to_dict(orient="records"), f"{output_dir}/{prefix}teams.json")
Utils.save_data_to_file(
self.members_df.to_dict(orient="records"), f"{output_dir}/{prefix}members.json")
Utils.save_data_to_file(
self.policies, f"{output_dir}/{prefix}policies.json")
def main():
roles = Utils.read_json_file("./output/roles.json")
teams = Utils.read_json_file("./output/teams.json")
members = Utils.read_json_file("./output/members.json")
transformer = Transformer(roles=roles, members=members, teams=teams)
transformer.process()
Utils.save_data_to_file(
transformer.roles, "./output/transformer-out-roles.json")
Utils.save_data_to_file(
transformer.teams, "./output/transformer-out-teams.json")
Utils.save_data_to_file(
transformer.members, "./output/transformer-out-members.json")
members_df = transformer.member_df
print(members_df['customRoles'])
print(members_df['customRoles_count'].mean())
if __name__ == '__main__':
main()