-
Notifications
You must be signed in to change notification settings - Fork 0
/
sequencer.py
99 lines (89 loc) · 3.04 KB
/
sequencer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class TempoMap(list):
def __init__(self, stream):
self.stream = stream
def add_and_update(self, event):
self.add(event)
self.update()
def add(self, event):
# get tempo in microseconds per beat
tempo = event.mpqn
# convert into milliseconds per beat
tempo = tempo / 1000.0
# generate ms per tick
event.mpt = tempo / self.stream.resolution
self.append(event)
def update(self):
self.sort()
# adjust running time
last = None
for event in self:
if last:
event.msdelay = last.msdelay + \
int(last.mpt * (event.tick - last.tick))
last = event
def get_tempo(self, offset=0):
last = self[0]
for tm in self[1:]:
if tm.tick > offset:
return last
last = tm
return last
class EventStreamIterator(object):
def __init__(self, stream, window):
self.stream = stream
self.trackpool = stream.trackpool
self.window_length = window
self.window_edge = 0
self.leftover = None
self.events = self.stream.iterevents()
# First, need to look ahead to see when the
# tempo markers end
self.ttpts = []
for tempo in stream.tempomap[1:]:
self.ttpts.append(tempo.tick)
# Finally, add the end of track tick.
self.ttpts.append(stream.endoftrack.tick)
self.ttpts = iter(self.ttpts)
# Setup next tempo timepoint
self.ttp = next(self.ttpts)
self.tempomap = iter(self.stream.tempomap)
self.tempo = self.tempomap.next()
self.endoftrack = False
def __iter__(self):
return self
def __next_edge(self):
if self.endoftrack:
raise StopIteration
lastedge = self.window_edge
self.window_edge += int(self.window_length / self.tempo.mpt)
if self.window_edge > self.ttp:
# We're past the tempo-marker.
oldttp = self.ttp
try:
self.ttp = next(self.ttpts)
except StopIteration:
# End of Track!
self.window_edge = self.ttp
self.endoftrack = True
return
# Calculate the next window edge, taking into
# account the tempo change.
msused = (oldttp - lastedge) * self.tempo.mpt
msleft = self.window_length - msused
self.tempo = self.tempomap.next()
ticksleft = msleft / self.tempo.mpt
self.window_edge = ticksleft + self.tempo.tick
def __next__(self):
ret = []
self.__next_edge()
if self.leftover:
if self.leftover.tick > self.window_edge:
return ret
ret.append(self.leftover)
self.leftover = None
for event in self.events:
if event.tick > self.window_edge:
self.leftover = event
return ret
ret.append(event)
return ret