-
Notifications
You must be signed in to change notification settings - Fork 5
/
porcupine.py
121 lines (102 loc) · 3.82 KB
/
porcupine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import logging
import os
import sys
from threading import Thread
from kalliope import Utils
from cffi import FFI as _FFI
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from porcupinedecoder import HotwordDetector
class PorcupineWakeWordNotFound(Exception):
pass
class MissingParameterException(Exception):
pass
logging.basicConfig()
logger = logging.getLogger("kalliope")
class Porcupine(Thread):
def __init__(self, **kwargs):
super(Porcupine, self).__init__()
self._ignore_stderr()
# Pause listening boolean.
self.kill_received = False
# Get input device if set by the user.
self.input_device_index = kwargs.get('input_device_index', None)
# Callback function to call when hotword caught.
self.callback = kwargs.get('callback', None)
if self.callback is None:
raise MissingParameterException("Callback function is required with porcupine")
# Get keywords to load.
self.keywords = kwargs.get('keywords', None)
if self.keywords is None:
raise MissingParameterException("At least one keyword is required with porcupine")
keyword_file_paths = list()
sensitivities = list()
for keyword in self.keywords:
path = Utils.get_real_file_path(keyword['keyword']['ppn_file'])
try:
os.path.isfile(path)
except TypeError:
raise PorcupineWakeWordNotFound("The porcupine keyword at %s does not exist" % keyword['keyword']['ppn_file'])
try:
sensitivity = keyword['keyword']['sensitivity']
except KeyError:
sensitivity = 0.5
keyword_file_paths.append(path)
sensitivities.append(sensitivity)
keyword_file_paths = ", ".join(keyword_file_paths)
sensitivities = ", ".join(map(str, sensitivities))
self.detector = HotwordDetector(keyword_file_paths=keyword_file_paths,
sensitivities=sensitivities,
input_device_index=self.input_device_index,
detected_callback=self.callback
)
def run(self):
"""
Start the porcupine thread and wait for a Kalliope trigger word
:return:
"""
# start porcupine loop forever
self.detector.daemon = True
self.detector.start()
self.detector.join()
def pause(self):
"""
pause the porcupine main thread
"""
logger.debug("[Porcupine] Pausing porcupine process")
self.detector.paused = True
def unpause(self):
"""
unpause the porcupine main thread
"""
logger.debug("[Porcupine] Unpausing porcupine process")
self.detector.paused = False
def stop(self):
"""
Kill the porcupine process
:return:
"""
logger.debug("[Porcupine] Killing porcupine process")
self.interrupted = True
self.detector.terminate()
@staticmethod
def _ignore_stderr():
"""
Try to forward PortAudio messages from stderr to /dev/null.
"""
ffi = _FFI()
ffi.cdef("""
/* from stdio.h */
extern FILE* fopen(const char* path, const char* mode);
extern int fclose(FILE* fp);
extern FILE* stderr; /* GNU C library */
extern FILE* __stderrp; /* Mac OS X */
""")
stdio = ffi.dlopen(None)
devnull = stdio.fopen(os.devnull.encode(), b'w')
try:
stdio.stderr = devnull
except KeyError:
try:
stdio.__stderrp = devnull
except KeyError:
stdio.fclose(devnull)