Skip to content

Commit

Permalink
Implement the skoolkit.tape module
Browse files Browse the repository at this point in the history
  • Loading branch information
skoolkid committed Jun 19, 2024
1 parent ed6f1b8 commit eb8a179
Show file tree
Hide file tree
Showing 7 changed files with 750 additions and 699 deletions.
5 changes: 4 additions & 1 deletion skoolkit/loadtracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ def get_edges(blocks, first_edge, polarity, analyse=False):
if analyse:
print('T-states EAR Description')

for i, (block_id, timings, data) in enumerate(blocks):
for i, block in enumerate(blocks):
timings = block.timings
data = block.data

# Pilot tone
if analyse and timings.pilot_len:
ear = (len(edges) - 1) % 2
Expand Down
263 changes: 32 additions & 231 deletions skoolkit/tap2sna.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@
from urllib.request import Request, urlopen
from urllib.parse import urlparse

from skoolkit import (SkoolKitError, CSimulator, CCMIOSimulator, get_dword,
get_int_param, get_object, get_word, get_word3, integer,
open_file, parse_int, read_bin_file, write_line, ROM48,
VERSION)
from skoolkit import (SkoolKitError, CSimulator, CCMIOSimulator, get_int_param,
get_object, get_word, integer, open_file, parse_int,
read_bin_file, write_line, ROM48, VERSION)
from skoolkit.cmiosimulator import CMIOSimulator
from skoolkit.config import get_config, show_config, update_options
from skoolkit.kbtracer import KeyboardTracer
Expand All @@ -36,6 +35,7 @@
from skoolkit.simulator import Simulator
from skoolkit.simutils import FRAME_DURATIONS, INT_ACTIVE, PC, T, get_state
from skoolkit.snapshot import move, poke, print_reg_help, print_state_help, write_snapshot
from skoolkit.tape import parse_tap, parse_tzx

SYSVARS = (
255, 0, 0, 0, # 23552 - KSTATE0
Expand Down Expand Up @@ -239,25 +239,6 @@ def convert_arg_line_to_args(self, arg_line):
class TapeError(Exception):
pass

class TapeBlockTimings:
def __init__(self, pilot_len=0, pilot=0, sync=(), zero=0, one=0, pause=0, used_bits=8, pulses=(), error=None):
self.pilot_len = pilot_len
self.pilot = pilot
self.sync = sync
self.zero = zero
self.one = one
self.pause = pause
self.used_bits = used_bits
self.pulses = pulses
self.error = error

def get_tape_block_timings(first_byte, pause=3500000):
if first_byte == 0:
# Header block
return TapeBlockTimings(8063, 2168, (667, 735), 855, 1710, pause)
# Data block
return TapeBlockTimings(3223, 2168, (667, 735), 855, 1710, pause)

def _write_snapshot(ram, options, fname):
parent_dir = os.path.dirname(fname)
if parent_dir and not os.path.isdir(parent_dir):
Expand Down Expand Up @@ -423,11 +404,11 @@ def sim_load(blocks, options, config):
if b > 0xFF:
memory[a + 1] = b // 256
is_code = False
for block_id, timings, block_data in blocks:
if block_id == 0x15:
for block in blocks:
if block.block_id == 0x15:
break
if block_data:
if len(block_data) >= 19 and tuple(block_data[0:2]) == (0, 3):
if block.data:
if len(block.data) >= 19 and tuple(block.data[0:2]) == (0, 3):
is_code = True
break
if is_code:
Expand Down Expand Up @@ -542,217 +523,37 @@ def _get_ram(blocks, options):
_ram_operations(snapshot, options.ram_ops, blocks)
return snapshot[16384:]

def _get_tzx_block(data, i, sim):
# https://worldofspectrum.net/features/TZXformat.html
block_id = data[i]
tape_data = None
timings = None
i += 1
if block_id == 16:
# Standard speed data block
pause = get_word(data, i) * 3500
length = get_word(data, i + 2)
tape_data = data[i + 4:i + 4 + length]
if tape_data:
timings = get_tape_block_timings(tape_data[0], pause)
i += 4 + length
elif block_id == 17:
# Turbo speed data block
length = get_word3(data, i + 15)
tape_data = data[i + 18:i + 18 + length]
pilot = get_word(data, i)
sync1 = get_word(data, i + 2)
sync2 = get_word(data, i + 4)
zero = get_word(data, i + 6)
one = get_word(data, i + 8)
pilot_len = get_word(data, i + 10)
used_bits = data[i + 12]
pause = get_word(data, i + 13) * 3500
timings = TapeBlockTimings(pilot_len, pilot, (sync1, sync2), zero, one, pause, used_bits)
i += 18 + length
elif block_id == 18:
# Pure tone
if sim:
tape_data = []
pilot = get_word(data, i)
pilot_len = get_word(data, i + 2)
timings = TapeBlockTimings(pilot_len, pilot)
i += 4
elif block_id == 19:
# Sequence of pulses of various lengths
length = 2 * data[i]
if sim:
tape_data = []
pulses = [get_word(data, j) for j in range(i + 1, i + 1 + length, 2)]
timings = TapeBlockTimings(sync=pulses)
i += length + 1
elif block_id == 20:
# Pure data block
length = get_word3(data, i + 7)
tape_data = data[i + 10:i + 10 + length]
if sim:
zero = get_word(data, i)
one = get_word(data, i + 2)
used_bits = data[i + 4]
pause = get_word(data, i + 5) * 3500
timings = TapeBlockTimings(zero=zero, one=one, pause=pause, used_bits=used_bits)
i += 10 + length
elif block_id == 21:
# Direct recording block
if sim:
tape_data = []
tps = get_word(data, i)
pause = get_word(data, i + 2)
used_bits = data[i + 4]
num_bytes = get_word3(data, i + 5)
j = 0
pulses = []
prev_bit = data[i + 8] & 0x80
if prev_bit:
pulses.append(0)
bit_count = 0
for j, b in enumerate(data[i + 8:i + 8 + num_bytes], 1):
if j < num_bytes:
num_bits = 8
else:
num_bits = used_bits
for k in range(num_bits):
bit = b & 0x80
if bit == prev_bit:
bit_count += 1
else:
pulses.append(bit_count * tps)
prev_bit = bit
bit_count = 1
b *= 2
pulses.append(bit_count * tps)
timings = TapeBlockTimings(pause=pause, pulses=pulses)
i += get_word3(data, i + 5) + 8
elif block_id == 24:
# CSW recording block
if sim:
timings = TapeBlockTimings(error="TZX CSW Recording (0x18) not supported")
i += get_dword(data, i) + 4
elif block_id == 25:
# Generalized data block
if sim:
timings = TapeBlockTimings(error="TZX Generalized Data Block (0x19) not supported")
i += get_dword(data, i) + 4
elif block_id == 32:
# Pause (silence) or 'Stop the tape' command
if sim:
pause = get_word(data, i) * 3500
timings = TapeBlockTimings(pause=pause)
i += 2
elif block_id == 33:
# Group start
i += data[i] + 1
elif block_id == 34:
# Group end
pass
elif block_id == 35:
# Jump to block
i += 2
elif block_id == 36:
# Loop start
i += 2
elif block_id == 37:
# Loop end
pass
elif block_id == 38:
# Call sequence
i += get_word(data, i) * 2 + 2
elif block_id == 39:
# Return from sequence
pass
elif block_id == 40:
# Select block
i += get_word(data, i) + 2
elif block_id == 42:
# Stop the tape if in 48K mode
i += 4
elif block_id == 43:
# Set signal level
i += 5
elif block_id == 48:
# Text description
i += data[i] + 1
elif block_id == 49:
# Message block
i += data[i + 1] + 2
elif block_id == 50:
# Archive info
i += get_word(data, i) + 2
elif block_id == 51:
# Hardware type
i += data[i] * 3 + 1
elif block_id == 53:
# Custom info block
i += get_dword(data, i + 16) + 20
elif block_id == 90:
# "Glue" block
i += 9
else:
raise TapeError('Unknown TZX block ID: 0x{:X}'.format(block_id))
return i, block_id, timings, tape_data

def _get_tzx_blocks(data, sim, start, stop, is48):
signature = ''.join(chr(b) for b in data[:7])
if signature != 'ZXTape!':
raise TapeError("Not a TZX file")
i = 10
tape = parse_tzx(data, start, stop, False, sim)
blocks = []
block_num = 1
loop = None
while i < len(data):
if block_num >= stop > 0:
break
i, block_id, timings, tape_data = _get_tzx_block(data, i, sim)
if block_num >= start:
if timings and timings.error:
raise TapeError(timings.error)
if sim:
if block_id == 0x20:
if stop == 0 and timings.pause == 0:
break
elif block_id == 0x24:
loop = []
repetitions = get_word(data, i - 2)
elif block_id == 0x2A and stop == 0 and is48:
# Stop the tape if in 48K mode
for block in tape.blocks:
if block.timings and block.timings.error:
raise TapeError(block.timings.error)
if sim:
if block.block_id == 0x20:
if stop == 0 and block.timings.pause == 0:
break
if loop is None:
blocks.append((block_id, timings, tape_data))
else:
loop.append((block_id, timings, tape_data))
if block_id == 0x25 and loop is not None:
blocks.extend(loop * repetitions)
loop = None
block_num += 1
return blocks

def get_tap_blocks(tap, start=1, stop=0):
blocks = []
block_num = 1
i = 0
while i + 1 < len(tap):
if block_num >= stop > 0:
break
block_len = tap[i] + 256 * tap[i + 1]
i += 2
if block_num >= start:
data = tap[i:i + block_len]
if data:
timings = get_tape_block_timings(data[0])
blocks.append((None, timings, data))
i += block_len
block_num += 1
elif block.block_id == 0x24:
loop = []
repetitions = get_word(block.block_data, 0)
elif block.block_id == 0x2A and stop == 0 and is48:
# Stop the tape if in 48K mode
break
if loop is None:
blocks.append(block)
else:
loop.append(block)
if block.block_id == 0x25 and loop is not None:
blocks.extend(loop * repetitions)
loop = None
return blocks

def _get_tape_blocks(tape_type, tape, sim, start, stop, is48):
if tape_type.lower() == 'tzx':
return _get_tzx_blocks(tape, sim, start, stop, is48)
return get_tap_blocks(tape, start, stop)
tap = parse_tap(tape, start, stop)
return [block for block in tap.blocks if block.data]

def _get_tape(urlstring, user_agent, member):
url = urlparse(urlstring)
Expand Down Expand Up @@ -1043,12 +844,12 @@ def make_snapshot(url, options, outfile, config):
is48 = True
tape_blocks = _get_tape_blocks(tape_type, tape, options.sim_load, options.tape_start, options.tape_stop, is48)
if options.sim_load:
blocks = [b for b in tape_blocks if b[1]]
blocks = [block for block in tape_blocks if block.timings]
if not blocks:
raise TapeError('Tape is empty')
ram = sim_load(blocks, options, config)
else:
blocks = [b[2] for b in tape_blocks]
blocks = [block.data for block in tape_blocks]
ram = _get_ram(blocks, options)
if outfile is None:
if tape_name.lower().endswith(('.tap', '.tzx')):
Expand Down
Loading

0 comments on commit eb8a179

Please sign in to comment.