-
Notifications
You must be signed in to change notification settings - Fork 151
/
Copy path__main__.py
141 lines (102 loc) · 4.3 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import logging
import subprocess as sp
import shutil
import warnings
import copy as cp
from pathlib import Path
import argschema
from allensdk.config.manifest import Manifest
from ._schemas import InputSchema, OutputSchema, available_hashers
from allensdk.brain_observatory.argschema_utilities import write_or_print_outputs
def hash_file(path, hasher_cls):
with open(path, 'rb') as file_obj:
hasher = hasher_cls()
hasher.update(file_obj.read())
return hasher.digest()
def walk_fs_tree(root, fn):
root = Path(root)
fn(root)
if root.is_dir():
for item in root.iterdir():
walk_fs_tree(item, fn)
def copy_file_entry(source, dest, use_rsync, make_parent_dirs, chmod=None):
leftmost = None
if make_parent_dirs:
leftmost = Manifest.safe_make_parent_dirs(dest)
if use_rsync:
sp.check_call(['rsync', '-a', source, dest])
else:
if Path(source).is_dir():
shutil.copytree(source, dest)
else:
shutil.copy(source, dest)
if chmod is not None:
chmod_target = leftmost if leftmost is not None else dest
apply_permissions = lambda path: path.chmod(int(f"0o{chmod}", 0))
walk_fs_tree(chmod_target, apply_permissions)
logging.info(f"copied from {source} to {dest}")
def raise_or_warn(message, do_raise, typ=None):
if do_raise == False:
typ = UserWarning if typ is None else typ
warnings.warn(message, typ)
else:
typ = ValueError if typ is None else typ
raise typ(message)
def compare(source, dest, hasher_cls, raise_if_comparison_fails):
source_path = Path(source)
dest_path = Path(dest)
if source_path.is_dir() and dest_path.is_dir():
return compare_directories(source, dest, hasher_cls, raise_if_comparison_fails)
elif (not source_path.is_dir()) and (not dest_path.is_dir()):
return compare_files(source, dest, hasher_cls, raise_if_comparison_fails)
else:
raise_or_warn(f"unable to compare files with directories: {source}, {dest}", raise_if_comparison_fails)
def compare_files(source, dest, hasher_cls, raise_if_comparison_fails):
source_hash = hash_file(source, hasher_cls)
dest_hash = hash_file(dest, hasher_cls)
if source_hash != dest_hash:
raise_or_warn(f"comparison of {source} and {dest} using {hasher_cls.__name__} failed", raise_if_comparison_fails)
return source_hash, dest_hash
def compare_directories(source, dest, hasher_cls, raise_if_comparison_fails):
source_contents = sorted([node for node in Path(source).iterdir()])
dest_contents = sorted([node for node in Path(dest).iterdir()])
if len(source_contents) != len(dest_contents):
raise_or_warn(
f"{source} contains {len(source_contents)} items while {dest} contains {len(dest_contents)} items",
raise_if_comparison_fails
)
for sitem, ditem in zip(source_contents, dest_contents):
spath = str(Path(source, sitem))
dpath = str(Path(dest, ditem))
if sitem != ditem:
raise_or_warn(f"mismatch between {spath} and {dpath}", raise_if_comparison_fails)
compare(spath, dpath, hasher_cls, raise_if_comparison_fails)
def main(
files,
use_rsync=True,
hasher_key=None,
raise_if_comparison_fails=True,
make_parent_dirs=True,
chmod=775,
**kwargs
):
hasher_cls = available_hashers[hasher_key]
output = []
for file_entry in files:
record = cp.deepcopy(file_entry)
copy_file_entry(file_entry['source'], file_entry['destination'], use_rsync, make_parent_dirs, chmod=chmod)
if hasher_cls is not None:
hashes = compare(file_entry['source'], file_entry['destination'], hasher_cls, raise_if_comparison_fails)
if hashes is not None:
record['source_hash'] = [int(ii) for ii in hashes[0]]
record['destination_hash'] = [int(ii) for ii in hashes[1]]
output.append(record)
return {'files': output}
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s - %(process)s - %(levelname)s - %(message)s')
parser = argschema.ArgSchemaParser(
schema_type=InputSchema,
output_schema_type=OutputSchema,
)
output = main(**parser.args)
write_or_print_outputs(output, parser)