forked from mdooley/wtpa2-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwtpa2.py
282 lines (232 loc) · 8.71 KB
/
wtpa2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
from sys import version_info
if version_info < (2,7):
print("Python version mismatch! Currently only Python 2.7+ is supported.")
print('')
import argparse
import os
import aifc
import struct
import stat
import platform
from tempfile import NamedTemporaryFile
from contextlib import closing
from subprocess import call
from distutils import spawn
class WTPA2:
r"""WTPA2 Sample Packer/Unpacker
"""
def __init__(self):
r"""Initialize the WTPA2 object
"""
self.num_samples = 0
self.toc_offset = 16
self.header = bytearray(512)
self.sox_cmd = spawn.find_executable("sox") # False to disable
def pack(self, outfile, infiles):
r""" Pack AIFF samples into the WTPA2 format.
outfile is the file to be written. infiles is a list of files and
directories to process for AIFFs in the proper format.
"""
self.header[0:3] = b"WTPA"
self.header[4:7] = b"SAMP"
self.outfile = open(outfile, "wb")
self.outfile.seek(512)
for infile in infiles:
if os.path.exists(infile):
if os.path.isdir(infile):
self.process_dir(infile)
else:
self.process_file(infile)
else:
print("{} does not exist, skipping...".format(infile))
self.outfile.seek(0)
self.outfile.write(self.header[0:512])
self.outfile.close()
def packmap(self, outfile, mapfile):
r""" Pack AIFF samples into the WTPA2 format, mapped to midi notes.
outfile is the file to be written. mapfile is a txt file with one
filename per line where each line-1 corresponds to a sample slot.
"""
self.header[0:3] = b"WTPA"
self.header[4:7] = b"SAMP"
self.outfile = open(outfile, "wb")
self.outfile.seek(512)
with closing(open(mapfile, "r")) as f:
infiles = f.readlines()
for infile in infiles:
infile = infile.strip()
if infile and os.path.exists(infile):
self.process_file(infile) or self.skip_current_slot()
else:
self.skip_current_slot()
self.outfile.seek(0)
self.outfile.write(self.header)
self.outfile.close()
def unpack(self, src, dest, samples=512):
r""" Extract samples from the provided file/device.
The src parameter is a binary file or device to attempt to extract
samples from. dest is a directory where any extracted samples
will be written. samples is an optional argument to limit the number
of slots to examine (default is to look at all of them).
"""
if not os.path.exists(src):
print("ERROR: {} does not exist.".format(src))
src = ""
elif os.path.isfile(src):
print("Reading from file {}.".format(src))
else:
if platform.system() == "Windows":
if len(src) == 2 and os.path.isdir(src):
src = "\\\\.\\" + src
print("Reading from drive {}.".format(src))
else:
print("ERROR: {} is not a drive.".format(src))
src = ""
elif platform.system() == "Darwin" or platform.system() == "Linux":
mode = os.stat(src).st_mode
if stat.S_ISBLK(mode):
print("Reading from device {}.".format(src))
else:
print("ERROR: {} is not a device".format(src))
src = ""
if not os.path.exists(dest):
try:
os.makedirs(dest)
except OSError:
print("ERROR: Creating directory {} failed.".format(dest))
dest = ""
pass
elif not os.path.isdir(dest):
print("ERROR: {} exists, but is not a directory.".format(dest))
dest = ""
if not (src == "" or dest == ""):
self.outfile = open(src, "rb")
self.header = self.outfile.read(512)
if self.header[0:4] != b"WTPA":
print("ERROR: WTPA data not found.")
elif self.header[4:8] != b"SAMP":
print("ERROR: WTPA Sample data not found.")
else:
for x in range(0, samples):
if self.sample_in_slot(x):
self.seek_to_slot(x)
f = os.path.join(dest, "%03d.aiff" % (x))
size = struct.unpack('>I', self.outfile.read(4))
print("Writing Sample from slot {} to {}.".format(x, f))
s = aifc.open(f, "w")
s.aiff()
s.setnchannels(1)
s.setsampwidth(1)
s.setframerate(22050)
s.setnframes(size[0])
s.writeframes(self.outfile.read(size[0]))
s.close()
self.outfile.close()
def seek_to_slot(self, slot):
r""" Seeks to the target slot in current file/device being processed.
"""
self.outfile.seek(512)
self.outfile.seek(((512*1024)*slot), 1)
def sample_in_slot(self, slot):
r""" Returns true if the header indicates a sample is present in the target slot.
"""
toc = self.header[ int( self.toc_offset + (slot/8) ) ]
if type(toc) is str: # 2.7 gives a binary string, 3.0 is an int
toc = ord( toc )
if toc & 1 << slot:
return True
else:
return False
def process_dir(self, path):
r""" Process a directory passed to the pack function.
Any files found in the target directory will be processed first, in
alphabetical order. Any sub-directories will then be processed, also
in alphabetical order.
"""
print("Processing directory {}...".format(path))
files = [ f for f in os.listdir(path) if os.path.isfile(os.path.join(path,f)) ]
dirs = [ f for f in os.listdir(path) if os.path.isdir(os.path.join(path,f)) ]
files.sort()
dirs.sort()
for f in files:
self.process_file(os.path.join(path,f))
for d in dirs:
self.process_dir(os.path.join(path,d))
def process_file(self, path):
r""" Process a file to be packed.
A file is packed only if it is an AIFF with the following properties:
- sample width of 1 byte (8 bits)
- mono
- less than 512K
An AIFF with an non-recommended sample rate is still packed, but a
warning is displayed.
"""
print("Processing file {}...".format(path))
try:
with closing(aifc.open(path)) as s:
if self.sox_cmd and (s.getsampwidth() != 1 or s.getnchannels() != 1 or s.getframerate() != 22050):
return self.soxify_and_process_file(path)
if s.getsampwidth() != 1:
print(" SKIPPED: Sample width = {}, should be 1.".format(s.getsampwidth()))
return False
elif s.getnchannels() != 1:
print(" SKIPPED: Number of channels = {}, should be 1.".format(s.getnchannels()))
return False
elif s.getnframes() > 512*1024:
print(" SKIPPED: Sample too long, length = {}, max is {}.".format(s.getnframes(), 512*1024))
return False
else:
if s.getframerate() != 22050:
print(" WARNING: Incorrect sample rate, rate = {} target is 22050".format(s.getframerate()))
self.outfile.write(struct.pack('>I', s.getnframes()))
self.outfile.write(s.readframes(s.getnframes()))
self.outfile.seek(512*1024 - s.getnframes() - 4, 1)
self.header[ int( self.toc_offset + ((self.num_samples)/8) ) ] |= (1 << (self.num_samples%8))
self.num_samples+=1
print(" OK: Wrote sample to slot {}".format(self.num_samples-1))
return True
except aifc.Error:
if self.sox_cmd:
return self.soxify_and_process_file(path)
else:
print(" SKIPPED: Not an AIFF".format(path))
return False
pass
def soxify(self, path):
print(" Soxifying".format(path))
temp_aiff = NamedTemporaryFile(mode='w+b', suffix='.aiff', delete=True)
call([self.sox_cmd, path, "-b 8", "-r 22050", "-c 1", temp_aiff.name])
return temp_aiff
def soxify_and_process_file(self, path):
with closing(self.soxify(path)) as temp_aiff:
return self.process_file(temp_aiff.name)
def skip_current_slot(self):
print("Skipping slot {}".format(self.num_samples))
self.header[ int( self.toc_offset + ((self.num_samples)/8) ) ] |= 0
self.num_samples+=1
self.seek_to_slot(self.num_samples)
def slot_type(x):
x = int(x)
if x <= 0 or x > 512:
raise argparse.ArgumentTypeError("Valid slot range 1 - 512")
return x
parser = argparse.ArgumentParser(description="WTPA2 file utility. Pack/Extract samples to/from the WTPA2 format.")
sparsers = parser.add_subparsers(dest="command", title="sub-commands")
p_parser = sparsers.add_parser("pack", help="pack AIFFs into a WTPA2 readable binary file")
p_parser.add_argument("outfile", type=str, help="output file name")
p_parser.add_argument("infiles", nargs="+", type=str, help="input directories/files")
m_parser = sparsers.add_parser("packmap", help="pack AIFFs into a WTPA2 readable binary file with map file")
m_parser.add_argument("outfile", type=str, help="output file name")
m_parser.add_argument("mapfile", type=str, help="map file name")
e_parser = sparsers.add_parser("extract", help="extract samples from a WTPA2 formatted binary file or device")
e_parser.add_argument("-s", "--slots", default=512, type=slot_type, help="limit sample slots read")
e_parser.add_argument("src", type=str, help="input file or path to device")
e_parser.add_argument("dest", type=str, help="output directory")
args = parser.parse_args()
wtpa = WTPA2()
if args.command == "pack":
wtpa.pack(args.outfile, args.infiles)
elif args.command == "packmap":
wtpa.packmap(args.outfile, args.mapfile)
elif args.command == "extract":
wtpa.unpack(args.src, args.dest, samples=args.slots)