Skip to content

Commit

Permalink
non blocking audio samples in zero_record.py
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolas-f committed Jun 7, 2023
1 parent e784a30 commit 999c607
Showing 1 changed file with 80 additions and 36 deletions.
116 changes: 80 additions & 36 deletions services/zero_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,45 @@
import datetime
import io
import os
import collections
import threading

ALERT_STACK_BYTES = 960000
ALERT_DELAY = 5.0


class ZeroMQThread(threading.Thread):
def __init__(self, args, data: dict):
threading.Thread.__init__(self)
self.samples_queue = collections.deque()
self.args = args
self.data = data
self.last_warning = 0

def push_bytes(self, samples_bytes):
self.samples_queue.append(samples_bytes)
sum_bytes = sum([len(element) for element in self.samples_queue])
if sum_bytes > ALERT_STACK_BYTES and \
time.time() - self.last_warning > ALERT_DELAY:
print("Warning buffer overflowing with %d bytes" % sum_bytes)
self.last_warning = time.time()

def run(self):
interface = self.args.interface
port = self.args.port

context = zmq.Context()
socket = context.socket(zmq.PUB)
address = "tcp://%s:%d" % (interface, port)
socket.bind(address)
print("Publishing samples on interface:")
print(address)
while self.data["running"] or len(self.samples_queue) > 0:
while len(self.samples_queue) > 0:
audio_data_bytes = self.samples_queue.popleft()
socket.send(audio_data_bytes)
time.sleep(0.05)


class AudioFolderPlayListBuffer(io.BytesIO):
def __init__(self, folder_or_file, sample_rate):
Expand Down Expand Up @@ -72,56 +111,61 @@ def read(self, __size: int = ...) -> bytes:
if sr != self.sample_rate:
import resampy
waveform = resampy.resample(waveform, sr, self.sample_rate)
super().__init__(b"")
self.write(waveform.tobytes())
self.seek(0)
super().__init__(b"")
self.write(waveform.tobytes())
self.seek(0)
return super().read(__size)


def publish_samples(args):
interface = args.interface
port = args.port
block_size = args.block_size
byte_rate = args.debug_byte_rate

context = zmq.Context()
socket = context.socket(zmq.PUB)
address = "tcp://%s:%d" % (interface, port)
socket.bind(address)
print("Publishing samples on interface:")
print(address)
if byte_rate > 0:
print("Warning zero_record in debug mode, sampling clocked at %d Hz" %
byte_rate)
if args.wave == "":
input_buffer = sys.stdin.buffer
else:
input_buffer = AudioFolderPlayListBuffer(args.wave, args.sample_rate)
byte_rate = input_buffer.get_bytes_rate()
start = time.time()
total_bytes_read = 0
while True:
audio_data_bytes = input_buffer.read(block_size)
if not audio_data_bytes:
print("%s End of audio samples, total bytes read %d" % (datetime.datetime.now().isoformat(),
total_bytes_read))
break
total_bytes_read += len(audio_data_bytes)
if byte_rate > 0:
# if byte rate provided by user
# pause time before reading the next bytes according to expected byte rate
cur = time.time()
samples_time = len(audio_data_bytes) / byte_rate
if cur - start < samples_time:
time.sleep(samples_time - (cur - start))
start = time.time()
# audio_data contains a list of block_size // 4 floats representing audio values
socket.send(audio_data_bytes)

data = {"running": True}
manager = ZeroMQThread(args=args, data=data)
manager.start()
start = time.time()
try:
while data["running"]:
audio_data_bytes = input_buffer.read(block_size)
if not audio_data_bytes:
print("%s End of audio samples, total bytes read %d" %
(datetime.datetime.now().isoformat(), total_bytes_read))
break
manager.push_bytes(audio_data_bytes)
total_bytes_read += len(audio_data_bytes)
if byte_rate > 0:
# if byte rate provided by user
# pause time before reading the next bytes according to expected byte rate
cur = time.time()
samples_time = len(audio_data_bytes) / byte_rate
if cur - start < samples_time:
time.sleep(samples_time - (cur - start))
start = time.time()
finally:
data["running"] = False

def main():
parser = argparse.ArgumentParser(description='This program read audio stream from std input and publish through'
' zeromq.', epilog='''example: arecord --disable-softvol -D
plughw:CARD=U18dB,DEV=0 -r 48000 -f FLOAT_LE -c 1 -t raw |
python3 -u zero_record.py -p 10001''',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
epilog = "example:\narecord --disable-softvol " \
"-D plughw:CARD=U18dB,DEV=0 -r 48000 -f FLOAT_LE -c 1 -t raw" \
" | python3 -u zero_record.py -p 10001 \n\n" \
"python zero_record.py -w " \
"../third_parties/yamnet/" \
"24968__wwwbonsonca__train_tgv_passing_06.wav"

parser = argparse.ArgumentParser(description=
'This program read audio stream from'
' std input and publish through zeromq.',
epilog=epilog, formatter_class=
argparse.RawTextHelpFormatter)

parser.add_argument("-p", "--port", help="Port to publish samples", default=10001, type=int)
parser.add_argument("-i", "--interface", help="Interface to publish", default="*", type=str)
Expand Down

0 comments on commit 999c607

Please sign in to comment.