forked from awaregroup/powerbi-vcs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pbivcs.py
161 lines (131 loc) · 6.46 KB
/
pbivcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# 1: have scripts which extract from .pbit to .pbit.extract - gitignore .pbit (and .pbix), AND creates .pbix.chksum (which is only useful for versioning purposes - one can confirm the state of their pbix)
# - script basically extracts .pbit to new folder .pbit.extract, but a) also extracts double-zipped content, and b) formats stuff nicely so it's readable/diffable/mergeable.
# 2: have git hooks which check, before a commit:
# - checks that the .pbit.extract folder is up to date with the latest .pbit (i.e. they match exactly - and the .pbit hasn't been exported but user forgot to run the extract script)
# - adds a warning (with y/n continue feedback) if the .pbix has been updated *after* the latest .pbit.extract is updated. (I.e. they maybe forgot to export the latest .pbit and extract, or exported .pbit but forgot to extract.) Note that this will be obvious in the case of only a single change (as it were) - since .pbix aren't tracked, they'll see no changes to git tracked files.
import zipfile
import os
import shutil
import sys
import fnmatch
import converters
CONVERTERS = [
('DataModelSchema', converters.JSONConverter('utf-16-le')),
('DiagramState', converters.JSONConverter('utf-16-le')),
('Report/Layout', converters.JSONConverter('utf-16-le')),
('Report/LinguisticSchema', converters.XMLConverter('utf-16-le', False)),
('[[]Content_Types[]].xml', converters.XMLConverter('utf-8-sig', True)),
('SecurityBindings', converters.NoopConverter()),
('Settings', converters.NoopConverter()),
('Version', converters.NoopConverter()),
('Report/StaticResources/', converters.NoopConverter()),
('DataMashup', converters.DataMashupConverter()),
('Metadata', converters.MetadataConverter()),
('*.json', converters.JSONConverter('utf-8'))
]
def find_converter(path):
for pattern, converter in CONVERTERS:
if fnmatch.fnmatch(path, pattern):
conv = converter
break
else:
conv = converters.NoopConverter()
return conv
def extract_pbit(pbit_path, outdir, overwrite):
"""
Convert a pbit to vcs format
"""
# TODO: check ends in pbit
# TODO: check all expected files are present (in the right order)
# wipe output directory and create:
if os.path.exists(outdir):
if overwrite:
shutil.rmtree(outdir)
else:
raise Exception('Output path "{0}" already exists'.format(outdir))
os.mkdir(outdir)
order = []
with zipfile.ZipFile(pbit_path, compression=zipfile.ZIP_DEFLATED) as zd:
# read items (in the order they appear in the archive)
for name in zd.namelist():
order.append(name)
outpath = os.path.join(outdir, name)
# get converter:
conv = find_converter(name)
# convert
conv.write_raw_to_vcs(zd.read(name), outpath)
# write order files:
open(os.path.join(outdir, ".zo"), 'w').write("\n".join(order))
def compress_pbit(extracted_path, compressed_path, overwrite):
"""Convert a vcs store to valid pbit."""
# TODO: check all paths exists
if os.path.exists(compressed_path):
if overwrite:
os.remove(compressed_path)
else:
raise Exception('Output path "{0}" already exists'.format(compressed_path))
# get order
with open(os.path.join(extracted_path, ".zo")) as f:
order = f.read().split("\n")
with zipfile.ZipFile(compressed_path, mode='w',
compression=zipfile.ZIP_DEFLATED) as zd:
for name in order:
# get converter:
conv = find_converter(name)
# convert
with zd.open(name, 'w') as z:
conv.write_vcs_to_raw(os.path.join(extracted_path, name), z)
def textconv_pbit(pbit_path, outio):
"""
Convert a pbit to a text format suitable for diffing
"""
# TODO: check ends in pbit
order = []
with zipfile.ZipFile(pbit_path, compression=zipfile.ZIP_DEFLATED, mode='r') as zd:
# read items (in the order they appear in the archive)
for name in zd.namelist():
order.append(name)
print("Filename: " + name, file=outio)
# get converter:
conv = find_converter(name)
# convert
conv.write_raw_to_textconv(zd.read(name), outio)
def _find_confs(path):
"""
Find all .pbivcs.conf files (if any) furthest down the path, ordered by hierarchy i.e.
'/path/to/my/.pbivcs.conf' would come before '/path/to/.pbivcs.conf'
"""
splat = tuple(i for i in os.path.split(os.path.abspath(os.path.normpath(path))) if i)
confs = []
for i in range(1, len(splat)):
parent = os.path.join(*splat[:i])
confpath = os.path.join(parent, '.pbivcs.conf')
if os.path.exists(confpath):
confs.append(confpath)
return confs
if __name__ == '__main__':
import configargparse
parser = configargparse.ArgumentParser(description="A utility for converting *.pbit files to and from a VCS-friendly format")
parser.add_argument('input', type=str, help="the input path")
parser.add_argument('output', type=str, help="the output path", nargs="?", default=None)
parser.add_argument('-x', action='store_true', dest="extract", default=True, help="extract pbit at INPUT to VCS-friendly format at OUTPUT")
parser.add_argument('-c', action='store_false', dest="extract", default=True, help="compress VCS-friendly format at INPUT to pbit at OUTPUT")
parser.add_argument('-s', action='store_true', dest="textconv", default=False, help="extract pbit at INPUT to textconv format on stdout")
parser.add_argument('--over-write', action='store_true', dest="overwrite", default=False, help="if present, allow overwriting of OUTPUT. If not, will fail if OUTPUT exists")
# parse args first to get input path:
input_path = parser.parse_args().input
# now set config files for parser:
parser._default_config_files = _find_confs(input_path)
# now parse again to get final args:
args = parser.parse_args()
if args.textconv:
textconv_pbit(args.input, sys.stdout)
else:
if args.output is None:
parser.error('the following arguments are required: output')
if args.input == args.output:
parser.error('Error! Input and output paths cannot be same')
if args.extract:
extract_pbit(args.input, args.output, args.overwrite)
else:
compress_pbit(args.input, args.output, args.overwrite)