-
Notifications
You must be signed in to change notification settings - Fork 1
/
status_result.py
127 lines (103 loc) · 4.72 KB
/
status_result.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import zipfile
import json
import hashlib
# Save the status of the script to a JSON file
def save_status(status_file, status, archive_filename):
with open(status_file, 'w') as f:
json.dump(status, f, indent=4)
print(f'{archive_filename}: Script status saved to', status_file)
# Build tree structure and add the item to the tree
def build_tree(path, contents):
parts = path.split('/', 1)
current = parts[0]
if len(parts) == 1:
contents.append({"type": "file", "name": current})
else:
directory_found = False
for item in contents:
if item["type"] == "directory" and item["name"] == current:
directory_found = True
build_tree(parts[1], item["children"])
break
if not directory_found:
new_dir = {"type": "directory", "name": current, "children": []}
build_tree(parts[1], new_dir["children"])
contents.append(new_dir)
# Build tree structure, calculate checksums and save them
def build_tree_and_save_checksums(downld_file):
with zipfile.ZipFile(downld_file, 'r') as zip_ref:
file_names = zip_ref.namelist()
archive_contents = []
for file_name in file_names:
if not file_name.endswith('/'): # Skip directories
with zip_ref.open(file_name, 'r') as file:
file_content = file.read()
checksum = hashlib.md5(file_content).hexdigest()
build_tree(file_name, archive_contents)
# Search for the file in the tree structure and add the checksum
current = archive_contents
for part in file_name.split('/'):
for item in current:
if item["name"] == part:
if item["type"] == "file":
item["checksum"] = checksum
else:
current = item["children"]
break
return archive_contents
# Compare checksums between old and new files
def compare_checksums(old_checksums, new_checksums):
result = {"added": [], "removed": [], "modified": []}
def compare_directories(old_dir, new_dir, path=""):
old_files = {item["name"]: item for item in old_dir if "name" in item}
new_files = {item["name"]: item for item in new_dir if "name" in item}
for name, new_item in new_files.items():
if name not in old_files:
new_item_copy = new_item.copy()
new_item_copy["name"] = f"{path}/{name}" if path else name
result["added"].append(new_item_copy)
else:
old_item = old_files[name]
if new_item["type"] == "file" and old_item["type"] == "file":
if new_item["checksum"] != old_item["checksum"]:
new_item_copy = new_item.copy()
new_item_copy["name"] = f"{path}/{name}" if path else name
result["modified"].append(new_item_copy)
elif new_item["type"] == "directory" and old_item["type"] == "directory":
child_path = f"{path}/{name}" if path else name
compare_directories(old_item["children"], new_item["children"], child_path)
for name, old_item in old_files.items():
if name not in new_files:
old_item_copy = old_item.copy()
old_item_copy["name"] = f"{path}/{name}" if path else name
result["removed"].append(old_item_copy)
compare_directories(old_checksums, new_checksums)
return result
# Save comparison results to a JSON file
def save_comparison_results(results, output_file):
with open(output_file, 'w') as f:
json.dump(results, f, indent=4)
def process_items(input_items):
def parse(item, folder):
path_parts = item["name"].split('/')
current = folder
for part in path_parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
file_name = path_parts[-1]
if "children" in item:
current[file_name] = {}
for child in item["children"]:
parse(child, current[file_name])
else:
current[file_name] = file_name + " (" + item["checksum"] + ")" if "checksum" in item else file_name
final_folder = {}
if isinstance(input_items, dict):
for key, value in input_items.items():
for item in value:
parse(item, final_folder)
else:
for item in input_items:
parse(item, final_folder)
return final_folder