-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpktv.py
226 lines (195 loc) · 10.3 KB
/
pktv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import datetime
import json
import tkinter as tk
import Levenshtein
import yaml
import frameView
class PacketView:
def __init__(self, my_dict):
self.my_dict = my_dict
self.max_msg_length = max(
len(key) for message_dict in self.my_dict for node_dict in message_dict.values() for key in
node_dict.keys())
self.node_set = set()
self.crnti_duid = None
for message_dict in self.my_dict:
for key, node_dict in message_dict.items():
self.crnti_duid = key
for message_key, message_value in node_dict.items():
if isinstance(message_value, dict):
src_node = f"{message_value['src_node']}"
self.node_set.add(src_node)
dst_node = f"{message_value['dst_node']}"
self.node_set.add(dst_node)
print("node set", self.node_set)
self.root = tk.Tk()
self.root.geometry('1000x2000')
self.root.title("Call Flow for CNTI_DUID: " + self.crnti_duid)
self.node_spacing = 130
# self.spacing = 200
self.text_box_offset = 10
self.canvas_width = 1500
self.canvas_height = 1000
self.preferred_order = ['NgNB-DU', '0', 'NgNB-CUCP', "0", 'NgNB-CUUP', "0", "NgAMF", "0", "xnCUCP"]
self.node_centers = {}
self.create_widgets()
self.draw_arrows()
self.scale = 1.0 # initial scale
self.bind("<MouseWheel>", self.zoom)
self.scale("all", 0, 0, self.scale, self.scale, expand=True)
self.node_name_txt = None
def scroll_canvas(self, event):
if event.delta:
self.canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
def zoom(self, event):
# Zoom in with Ctrl+scroll up, zoom out with Ctrl+scroll down
if event.state == 0x0004:
self.scale *= 1.1
elif event.state == 0x0005:
self.scale /= 1.1
# Update the canvas scale
def create_widgets(self):
# create a canvas with scrollbars for both x and y directions
self.canvas = tk.Canvas(self.root, width=self.canvas_width, height=self.canvas_height,
scrollregion=(0, 0, 5000, 5000), bg='white')
self.xscrollbar = tk.Scrollbar(self.root, orient=tk.HORIZONTAL, command=self.canvas.xview)
self.yscrollbar = tk.Scrollbar(self.root, orient=tk.VERTICAL, command=self.canvas.yview)
self.canvas.config(xscrollcommand=self.xscrollbar.set, yscrollcommand=self.yscrollbar.set)
# pack the scrollbars and canvas
self.xscrollbar.pack(side=tk.BOTTOM, fill=tk.X)
self.yscrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.canvas.pack(side=tk.LEFT, expand=True, fill=tk.BOTH, )
# create a frame to hold the content in the canvas
self.frame = tk.Frame(self.canvas, bg='white')
self.canvas.create_window((0, 0), window=self.frame, anchor='nw')
self.canvas.bind_all("<MouseWheel>", self.scroll_canvas)
self.canvas.pack()
self.node_dict = set()
for i, node_prefix in enumerate(self.preferred_order):
node_names = [node_name for node_name in self.node_set if node_name.startswith(node_prefix)]
for node in self.node_set:
prefix, ip = node.split('_')
self.node_dict.add(prefix)
# Set a threshold for the Levenshtein distance
threshold = 1
# Group the strings based on their similarity
groups = {}
for node in self.node_dict:
found = False
for key in groups:
if Levenshtein.distance(node, key) <= threshold:
groups[key].append(node)
found = True
break
if not found:
groups[node] = [node]
# Get the most similar node name from each group
unique_nodes = [max(group, key=len) for group in groups.values()]
print("unique_nodes", unique_nodes)
# Create rectangles and labels for each node based on their position in the node_positions dictionary
for i, node_prefix in enumerate(self.preferred_order):
node_names = [node_name for node_name in self.node_set if node_name.startswith(node_prefix)]
for node_name in node_names:
for j in unique_nodes:
if node_name.startswith(j):
x0 = i * self.node_spacing + 90
y0 = 60
text_item = self.canvas.create_text(x0, y0, text=f'{j}', anchor='n')
bbox = self.canvas.bbox(text_item)
x1, y1 = bbox[2] + self.text_box_offset, bbox[3] + self.text_box_offset
self.canvas.create_rectangle(bbox[0] - self.text_box_offset, bbox[1] - self.text_box_offset,
x1 - 1,
y1 - 1,
fill='pale goldenrod',
outline='gray', width=2)
self.canvas.create_text((bbox[0] + bbox[2]) / 2, (bbox[1] + bbox[3]) / 2, text=f'{j}',
anchor='center')
# Draw vertical lines from node box to ladder
x_center = (bbox[0] + bbox[2]) / 2
center_x = (bbox[0] + bbox[2]) / 2
self.node_centers[j] = center_x
self.canvas.create_line(x_center, y1, x_center, y1 + 5500, width=2, fill='gray')
print("self.node_centers", self.node_centers)
def on_click(self, event):
message = event.widget.itemcget(event.widget.find_withtag('current')[0], 'text')
output_str = message[:message.rfind('(')]
m = output_str.replace(' (', '_').replace(')', '').replace(" ", "")
for message_dict in self.my_dict:
for node_dict in message_dict.values():
for message_key, message_value in node_dict.items():
if isinstance(message_value, dict) and message_key == m:
packet = message_value['packet']
# self.reqformat(packet)
self.packet_to_dict(packet)
def packet_to_dict(self, packet):
# Extract IP layer if it exists
new_dict = {}
for key in packet:
# split the key by the first dot and get the top-level key and the second-level key suffix
if "." in key:
top_level_key, suffix = key.split(".", 1)
else:
top_level_key = key
suffix = ""
# create a new dictionary with the top-level key if it doesn't exist
if top_level_key not in new_dict:
new_dict[top_level_key] = {}
# add the second-level key suffix and its value to the new dictionary
new_dict[top_level_key][suffix] = packet[key]
# convert the output dictionary to a pretty-printed JSON string
output_str = json.dumps(new_dict, indent=4)
# Print packet dictionary in JSON format
frameView.frameView(output_str)
# return output_str
def reqformat(self, packet):
k = packet.ngap._get_all_field_lines()
j = {i.strip() for i in k if
':' in i and "id: id-" not in i and "protocolIEs" not in i and "criticality:" not in i and "Item " not in i}
t = sorted(j)
# Split each string into key-value pairs
kv_pairs = [s.strip().split(':', 1) for s in t]
# Create dictionary from key-value pairs
json_dict = {k: v for k, v in kv_pairs}
# Convert dictionary to JSON string
json_str = json.dumps(json_dict)
y = yaml.dump(t)
# frameView.frameView(y)
def draw_arrows(self):
i = 2
t = ''
y = (i * 70)
for node_dict in self.my_dict:
for msg_name in node_dict.keys():
for newmsg, values in node_dict[msg_name].items():
if isinstance(values, dict):
src_node_name = values['src_node'].split("_")[0][:20]
dst_node_name = values['dst_node'].split("_")[0][:20]
t1 = values['frame_time']
t1 = datetime.datetime.fromtimestamp(float(t1))
t1 = t1.astimezone(datetime.timezone.utc).strftime('%Y-%m-%d-%H-%M-%S-%f')
t1 = datetime.datetime.strptime(t1, '%Y-%m-%d-%H-%M-%S-%f')
formatted_time_str = t1.strftime('%H:%M:%S')
# Draw the arrow from the source to the destination node
for key in self.node_centers.keys():
if key.startswith(src_node_name):
src_x = self.node_centers[key]
if key.startswith(dst_node_name):
print(dst_node_name, key.startswith(dst_node_name), self.node_centers, key)
dst_x = self.node_centers[key]
# Draw the arrow from the source to the destination node
msg, fn = newmsg.split("_")
src = node_dict[msg_name][newmsg]['src_node']
self.canvas.create_line(src_x, y, dst_x, y, arrow='last', fill='black')
# Define a tag with the desired formatting options
if msg.startswith('rrc'):
my_font = ('Helvetica', 9, 'italic')
else:
my_font = ('Helvetica', 9, 'italic')
# Create the text item with the tag applied to the "msg" portion of the text
text_item = self.canvas.create_text((src_x + dst_x) / 2, y - 2,
text=f"{msg} ({fn}) ({formatted_time_str})",
anchor='s', font=my_font, fill="blue")
self.canvas.tag_bind(text_item, '<Button-1>', self.on_click)
y = (i * 25) - 80
i += 1.3
self.root.mainloop()