-
Notifications
You must be signed in to change notification settings - Fork 5
/
porcupinedecoder.py
130 lines (110 loc) · 4.76 KB
/
porcupinedecoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#
# Copyright 2018 Picovoice Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import struct
import time
from threading import Thread
import pyaudio
import logging
from kalliope.core.ConfigurationManager import SettingLoader
from kalliope import Utils
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from porcupinebinding import Porcupine
logging.basicConfig()
logger = logging.getLogger("kalliope")
TOP_DIR = os.path.dirname(os.path.abspath(__file__))
RESOURCE_FILE = os.path.join(TOP_DIR, "lib")
class HotwordDetector(Thread):
def __init__(
self,
keyword_file_paths,
sensitivities,
input_device_index,
detected_callback=None
):
"""
Constructor.
:param keyword_file_paths: List of absolute paths to keyword files.
:param sensitivities: Sensitivity parameter for each wake word. For more information refer to
'include/pv_porcupine.h'. It uses the
same sensitivity value for all keywords.
:param input_device_index: Optional argument. If provided, audio is recorded from this input device. Otherwise,
the default audio input device is used.
"""
super(HotwordDetector, self).__init__()
sl = SettingLoader()
settings = sl.settings
# For the raspberry there are 3 different types of libpv_porcupine.so provided,
# we use cortex-a53, tested on rpi 2 and 3.
self._library_path = RESOURCE_FILE + "/%s/libpv_porcupine.so" % (settings.machine)
self._model_file_path = RESOURCE_FILE + "/common/porcupine_params.pv"
self._input_device_index = input_device_index
self.kill_received = False
self.paused = False
self.detected_callback = detected_callback
keyword_file_paths = [x.strip() for x in keyword_file_paths.split(',')]
if isinstance(sensitivities, float):
sensitivities = [sensitivities] * len(keyword_file_paths)
else:
sensitivities = [float(x) for x in sensitivities.split(',')]
self._keyword_file_paths = keyword_file_paths
self._sensitivities = sensitivities
def run(self):
"""
Creates an input audio stream, initializes wake word detection (Porcupine) object, and monitors the audio
stream for occurrences of the wake word(s).
"""
keyword_names =\
[os.path.basename(x).replace('.ppn', '').replace('_compressed', '').split('_')[0] for x in self._keyword_file_paths]
for keyword_name, sensitivity in zip(keyword_names, self._sensitivities):
logger.debug('Listening for %s with sensitivity of %s' % (keyword_name, sensitivity))
self.porcupine = Porcupine(
library_path=self._library_path,
model_path=self._model_file_path,
keyword_paths=self._keyword_file_paths,
sensitivities=self._sensitivities)
self.pa = pyaudio.PyAudio()
self.audio_stream = self.pa.open(
rate=self.porcupine.sample_rate,
channels=1,
format=pyaudio.paInt16,
input=True,
frames_per_buffer=self.porcupine.frame_length,
input_device_index=self._input_device_index)
logger.debug("[Porcupine] detecting...")
while not self.kill_received:
if not self.paused:
pcm = self.audio_stream.read(self.porcupine.frame_length)
pcm = struct.unpack_from("h" * self.porcupine.frame_length, pcm)
result = self.porcupine.process(pcm)
if result >= 0:
message = "Keyword " + str(keyword_names[result]) + " detected"
Utils.print_info(message)
self.detected_callback()
self.paused = True
def terminate(self):
"""
Terminate audio stream. Users cannot call start() again to detect.
:return: None
"""
if self.porcupine is not None:
self.porcupine.delete()
if self.audio_stream is not None:
self.audio_stream.close()
if self.pa is not None:
self.pa.terminate()
logger.debug("[Porcupine] Audio stream cleaned.")