-
Notifications
You must be signed in to change notification settings - Fork 9
/
moofimage.py
executable file
·388 lines (335 loc) · 15.1 KB
/
moofimage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
#!/usr/bin/env python3
#(c) 2022 by 4am
#license:MIT
import wozardry # https://github.com/a2-4am/wozardry
import bitarray # https://pypi.org/project/bitarray/
import sys
def myhex(b):
return hex(b)[2:].rjust(2, "0").upper()
class MoofTrack(wozardry.Track):
def __init__(self, raw_bytes, raw_count):
wozardry.Track.__init__(self, raw_bytes, raw_count)
self.bit_index = 0
self.revolutions = 0
self.bits = bitarray.bitarray(endian="big")
self.bits.frombytes(self.raw_bytes)
while len(self.bits) > raw_count:
self.bits.pop()
def rewind(self, bit_count=1):
self.bit_index -= bit_count
while self.bit_index < 0:
self.bit_index += self.raw_count
self.revolutions -= 1
def forward(self, bit_count=1):
self.bit_index += bit_count
while self.bit_index >= self.raw_count:
self.bit_index -= self.raw_count
self.revolutions += 1
def bit(self):
b = self.bits[self.bit_index]
self.forward()
yield b
def nibble(self):
while not next(self.bit()): pass
n = 0b10000000
for bit_index in range(6, -1, -1):
b = next(self.bit())
n |= b << bit_index
yield n
def find(self, sequence):
starting_revolutions = self.revolutions
seen = [0] * len(sequence)
while (self.revolutions < starting_revolutions + 2):
del seen[0]
seen.append(next(self.nibble()))
if tuple(seen) == tuple(sequence): return True
return False
def find_this_not_that(self, good, bad):
starting_revolutions = self.revolutions
good = tuple(good)
bad = tuple(bad)
seen_good = [0] * len(good)
seen_bad = [0] * len(bad)
while (self.revolutions < starting_revolutions + 2):
del seen_good[0]
del seen_bad[0]
n = next(self.nibble())
seen_good.append(n)
seen_bad.append(n)
if tuple(seen_bad) == bad: return False
if tuple(seen_good) == good: return True
return False
class MoofAddressField:
def __init__(self, valid, volume, track_id, sector_id):
self.valid = valid
self.volume = volume
self.track_id = track_id
self.sector_id = sector_id
class MoofDataField:
def __init__(self, valid, sector_id, tags, data):
self.valid = valid
self.sector_id = sector_id
self.tags = tags
self.data = data
class MoofBlock:
def __init__(self, address_field, data_field):
self.address_field = address_field
self.data_field = data_field
class MoofRWTS:
kDefaultAddressPrologue16 = (0xD5, 0xAA, 0x96)
kDefaultAddressEpilogue16 = (0xDE, 0xAA)
kDefaultDataPrologue16 = (0xD5, 0xAA, 0xAD)
kDefaultDataEpilogue16 = (0xDE, 0xAA)
kDefaultNibbleTranslationTable16 = {
0x96: 0x00, 0x97: 0x01, 0x9A: 0x02, 0x9B: 0x03, 0x9D: 0x04, 0x9E: 0x05, 0x9F: 0x06, 0xA6: 0x07,
0xA7: 0x08, 0xAB: 0x09, 0xAC: 0x0A, 0xAD: 0x0B, 0xAE: 0x0C, 0xAF: 0x0D, 0xB2: 0x0E, 0xB3: 0x0F,
0xB4: 0x10, 0xB5: 0x11, 0xB6: 0x12, 0xB7: 0x13, 0xB9: 0x14, 0xBA: 0x15, 0xBB: 0x16, 0xBC: 0x17,
0xBD: 0x18, 0xBE: 0x19, 0xBF: 0x1A, 0xCB: 0x1B, 0xCD: 0x1C, 0xCE: 0x1D, 0xCF: 0x1E, 0xD3: 0x1F,
0xD6: 0x20, 0xD7: 0x21, 0xD9: 0x22, 0xDA: 0x23, 0xDB: 0x24, 0xDC: 0x25, 0xDD: 0x26, 0xDE: 0x27,
0xDF: 0x28, 0xE5: 0x29, 0xE6: 0x2A, 0xE7: 0x2B, 0xE9: 0x2C, 0xEA: 0x2D, 0xEB: 0x2E, 0xEC: 0x2F,
0xED: 0x30, 0xEE: 0x31, 0xEF: 0x32, 0xF2: 0x33, 0xF3: 0x34, 0xF4: 0x35, 0xF5: 0x36, 0xF6: 0x37,
0xF7: 0x38, 0xF9: 0x39, 0xFA: 0x3A, 0xFB: 0x3B, 0xFC: 0x3C, 0xFD: 0x3D, 0xFE: 0x3E, 0xFF: 0x3F,
}
def __init__(self,
address_prologue = kDefaultAddressPrologue16,
address_epilogue = kDefaultAddressEpilogue16,
data_prologue = kDefaultDataPrologue16,
data_epilogue = kDefaultDataEpilogue16,
nibble_translate_table = kDefaultNibbleTranslationTable16):
self.address_prologue = address_prologue
self.address_epilogue = address_epilogue
self.data_prologue = data_prologue
self.data_epilogue = data_epilogue
self.nibble_translate_table = nibble_translate_table
self.sectors_per_track = dict(zip(range(0xA0), (i for i in range(0x0C,0x07,-1) for j in range(0x20))))
def _(self, track):
return self.nibble_translate_table[next(track.nibble())]
def find_address_prologue(self, track):
return track.find(self.address_prologue)
def address_field_at_point(self, track):
h0 = self._(track)
sector_id = self._(track)
h2 = self._(track)
volume = self._(track)
checksum = self._(track)
valid = h0 ^ sector_id ^ h2 ^ volume == checksum
track_id = (h0 << 1) | ((h2 & 0b00000001) << 7) | ((h2 & 0b00100000) >> 5)
return MoofAddressField(valid, volume, track_id, sector_id)
def verify_nibbles_at_point(self, track, nibbles):
found = []
for i in nibbles:
found.append(next(track.nibble()))
return tuple(found) == tuple(nibbles)
def verify_address_epilogue_at_point(self, track):
return self.verify_nibbles_at_point(track, self.address_epilogue)
def find_data_prologue(self, track):
return track.find_this_not_that(self.data_prologue, self.address_prologue)
def data_field_at_point(self, track):
# three checksums
c1 = c2 = c3 = 0
# generator to decode grouped bytes while juggling three checksums
def gcr_generator(byte_groups):
nonlocal c1, c2, c3
for d0, d1, d2 in byte_groups:
c1 = (c1 & 0b11111111) << 1
if c1 > 0xFF:
c1 -= 0xFF
c3 += 1
b = d0 ^ c1
c3 += b
yield b
if c3 > 0xFF:
c3 &= 0b11111111
c2 += 1
b = d1 ^ c3
c2 += b
yield b
if c2 > 0xFF:
c2 &= 0b11111111
c1 += 1
b = d2 ^ c2
c1 += b
yield b
# first nibble is sector number
sector_id = self._(track)
# read 700 nibbles, decode each against nibble translate table, store in 175 groups of 4
nibble_groups = [(self._(track), self._(track), self._(track), self._(track))
for i in range(175)]
# convert each group of 4 nibbles into a group of 3 bytes to pass into the decoder
gcr_byte = gcr_generator((((n[1] & 0b00111111) | ((n[0] << 2) & 0b11000000),
(n[2] & 0b00111111) | ((n[0] << 4) & 0b11000000),
(n[3] & 0b00111111) | ((n[0] << 6) & 0b11000000))
for n in nibble_groups))
# decode 524 bytes (12 tag bytes + 512 data bytes)
tags = [next(gcr_byte) for i in range(12)]
data = [next(gcr_byte) for i in range(512)]
# validate checksums against last data field nibble and three epilogue nibbles
valid = nibble_groups[-1][-1] == ((c1 & 0b11000000) >> 6) | ((c2 & 0b11000000) >> 4) | ((c3 & 0b11000000) >> 2)
valid &= self._(track) == (c3 & 0b00111111)
valid &= self._(track) == (c2 & 0b00111111)
valid &= self._(track) == (c1 & 0b00111111)
return MoofDataField(valid, sector_id, tags, data)
def verify_data_epilogue_at_point(self, track):
return self.verify_nibbles_at_point(track, self.data_epilogue)
class DefaultLogger:
def warn(self, message, T=None, S=None, X=None, Y=None):
if T: T = myhex(T)
if S: S = myhex(S)
message = message.format(**locals())
sys.stderr.write(message)
sys.stderr.write('\n')
info=warn
error=warn
class MoofDiskImage(wozardry.WozDiskImage):
kE7Bytestream = (0x2B, 0x00, 0x2B, 0xFD, 0x83, 0x6F, 0x20, 0xE2,
0x8D, 0x99, 0x49, 0x44, 0x47, 0x82, 0xD9, 0x26,
0xFB, 0xC6, 0x3, 0xF8)
kPACEPrologue = (0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xAB, 0xCD, 0xEF, 0xEF)
def __init__(self, iostream=None, rwtsclass=MoofRWTS, loggerclass=DefaultLogger):
wozardry.WozDiskImage.__init__(self, iostream)
self.logger = loggerclass()
self.rwts = rwtsclass()
for i,t in zip(range(len(self.tracks)), self.tracks):
self.tracks[i] = MoofTrack(t.raw_bytes, t.raw_count)
self.parse()
def get_pace_key_at_point(self, track, bit_index):
# save bitstream position
track.bit_index, bit_index = bit_index, track.bit_index
key = []
if self.rwts.verify_nibbles_at_point(track, self.kPACEPrologue):
for i in range(4):
next(track.nibble())
for i in range(4):
x = (next(track.nibble()) << 8) + next(track.nibble())
x = x & 0x5555
x = (x | (x >> 1)) & 0x3333
x = (x | (x >> 2)) & 0x0f0f
x = (x | (x >> 4)) & 0x00ff
x = (x | (x >> 8)) & 0xffff
key.append(x)
key.reverse()
# restore bitstream position
track.bit_index, bit_index = bit_index, track.bit_index
return "".join(map(myhex, key))
def parse(self):
self.blocks = []
# only 400K and 800K disks supported at the moment
if not self.info["disk_type"] in (1,2): return
for track_index in self.tmap:
if track_index == 0xFF: continue
track = self.tracks[track_index]
seen_sectors = []
track_id = -1
while self.rwts.find_address_prologue(track):
address_field = self.rwts.address_field_at_point(track)
# log if address field checksum doesn't match
if not address_field.valid:
self.logger.warn(
'T{T},S{S} Address field checksum invalid',
T=address_field.track_id,
S=address_field.sector_id
)
continue
# log if track ID is ridiculous
if not (0x00 <= address_field.track_id <= 0x9F):
self.logger.warn(
'Address field track ID {T} invalid',
T=address_field.track_id
)
continue
# log if sector ID is ridiculous
if not (0x00 <= address_field.sector_id < self.rwts.sectors_per_track[address_field.track_id]):
self.logger.warn(
'Address field sector ID {S} invalid',
S=address_field.sector_id
)
continue
# log if address field epilogue isn't next
if not self.rwts.verify_address_epilogue_at_point(track):
self.logger.warn(
'T{T},S{S} Address field epilogue invalid',
T=address_field.track_id,
S=address_field.sector_id
)
continue
# if we see duplicate sector IDs, assume we're done
if address_field.sector_id in seen_sectors: break
seen_sectors.append(address_field.sector_id)
old_bit_index = track.bit_index
if not self.rwts.find_data_prologue(track):
# if we didn't find any data field prologue at all
# before the next address field prologue, then check
# if this is a specially formatted protection sector
# from which we can extract some useful information
decryption_key = self.get_pace_key_at_point(track, old_bit_index)
if decryption_key:
self.logger.info(
'T{T},S{S} Found PACE protection, key={X}',
T=address_field.track_id,
S=address_field.sector_id,
X=decryption_key
)
continue
try:
data_field = self.rwts.data_field_at_point(track)
except KeyError:
# log if GCR decoding failed
self.logger.warn(
'T{T},S{S} Data field contains invalid nibble',
T=address_field.track_id,
S=address_field.sector_id
)
continue
# log if checksums didn't match after GCR decoding
if not data_field.valid:
self.logger.warn(
'T{T},S{S} Data field checksums invalid',
T=address_field.track_id,
S=address_field.sector_id
)
continue
# address and data fields are supposed to contain
# matching sector IDs, so log if they don't match
if address_field.sector_id != data_field.sector_id:
self.logger.warn(
'T{T},S{S} Address and data field sector IDs do not match',
T=address_field.track_id,
S=address_field.sector_id
)
continue
# log if sector data contains E7 bitstream
if (sum(data_field.data[:0x18E]) == 0) and (tuple(data_field.data[0x18F:0x1A3]) == self.kE7Bytestream):
#print(data_field.data[0x18F:0x1A3])
self.logger.warn(
'T{T},S{S} Found E7 bitstream',
T=address_field.track_id,
S=address_field.sector_id
)
# log if data field epilogue isn't next
if not self.rwts.verify_data_epilogue_at_point(track):
self.logger.warn(
'T{T},S{S} Data field epilogue invalid',
T=address_field.track_id,
S=address_field.sector_id
)
continue
track_id = address_field.track_id
self.blocks.append(MoofBlock(address_field, data_field))
# move on if we didn't find any valid sectors
if track_id == -1: continue
# log if we didn't find enough sectors
sector_count = len(seen_sectors)
expected_count = self.rwts.sectors_per_track[track_id]
if sector_count < expected_count:
self.logger.warn(
'T{T} Found {X} sectors (expected {Y})',
T=track_id,
X=sector_count,
Y=expected_count
)
def driver(filename):
with open(filename, 'rb') as f:
mdisk = MoofDiskImage(f)
if __name__ == '__main__':
driver(sys.argv[1])